Schemaerkennung und Schemaentwicklung für den Kafka-Konnektor mit Snowpipe Streaming

Der Kafka-Konnektor mit Snowpipe Streaming unterstützt Schemaerkennung und Schemaentwicklung. Die Struktur von Tabellen in Snowflake kann automatisch definiert und weiterentwickelt werden, um die Struktur neuer Snowpipe Streaming-Daten zu unterstützen, die vom Kafka-Konnektor geladen werden.

Ohne Schemaerkennung und Schemaentwicklung besteht die vom Kafka-Konnektor geladene Snowflake-Tabelle nur aus zwei VARIANT-Spalten: RECORD_CONTENT und RECORD_METADATA. Wenn Schemaerkennung und Schemaentwicklung aktiviert sind, kann Snowflake das Schema der Streaming-Daten erkennen und Daten in Tabellen laden, die automatisch mit jedem benutzerdefinierten Schema übereinstimmen. Snowflake ermöglicht auch das Hinzufügen von neuen Spalten oder das Löschen der NOT NULL-Einschränkung von Spalten, die in den neuen Datendateien fehlen.

Bemerkung

Dieses Vorschau-Feature funktioniert nur für den Kafka-Konnektor mit Snowpipe Streaming. Es unterstützt derzeit nicht den Kafka-Konnektor mit dateibasierter Snowpipe.

Unter diesem Thema:

Voraussetzungen

Bevor Sie dieses Feature aktivieren, müssen Sie die folgenden Vorbereitungen treffen:

  • Laden Sie den Kafka-Konnektor, Version 2.0.0 oder höher herunter. Weitere Informationen dazu finden Sie unter Installieren und Konfigurieren des Kafka-Konnektors.

  • Verwenden Sie den Befehl ALTER TABLE, um den Parameter ENABLE_SCHEMA_EVOLUTION für die Tabelle auf TRUE zu setzen. Sie müssen außerdem eine Rolle verwenden, die über die EVOLVE SCHEMA-Berechtigung für die Tabelle verfügt. Weitere Informationen dazu finden Sie unter Tabellenschemaentwicklung.

Konfigurieren der erforderlichen Kafka-Eigenschaften

Konfigurieren Sie die folgenden erforderlichen Eigenschaften in der Eigenschaftendatei Ihres Kafka-Konnektors:

snowflake.ingestion.method

Geben Sie an, dass SNOWPIPE_STREAMING zum Laden Ihrer Kafka-Topic-Daten verwendet werden soll. Beachten Sie, dass dieses Vorschau-Feature derzeit keine Unterstützung für SNOWPIPE bietet.

snowflake.enable.schematization

Setzen Sie den Wert auf TRUE, um Schemaerkennung und Schemaentwicklung für den Kafka-Konnektor mit Snowpipe Streaming zu aktivieren. Der Standardwert ist FALSE.

schema.registry.url

Geben Sie die URL des Schemaregistrierungsdienstes an. Der Standardwert ist leer.

Je nach Dateiformat ist schema.registry.url erforderlich oder optional. Die Schemaerkennung mit dem Kafka-Konnektor wird in jedem der folgenden Szenarios unterstützt:

  • Die Schemaregistrierung für Avro und Protobuf ist erforderlich. Die Spalte wird mit den Datentypen erstellt, die in der bereitgestellten Schemaregistrierung definiert sind.

  • Die Schemaregistrierung für JSON ist optional. Wenn es keine Schemaregistrierung gibt, wird der Datentyp von den bereitgestellten Daten abgeleitet.

Konfigurieren Sie weitere Eigenschaften in der Eigenschaftendatei Ihres Kafka-Konnektors wie gewohnt. Weitere Informationen dazu finden Sie unter Konfigurieren des Kafka-Konnektors.

Konverter

Alle Konverter für strukturierte Daten, wie JSON, Avro und Protobuf, werden unterstützt. Beachten Sie, dass wir nur die folgenden Konverter für strukturierte Daten getestet haben:

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.protobuf.ProtobufConverter

  • org.apache.kafka.connect.json.JsonConverter

  • io.confluent.connect.json.JsonSchemaConverter

Konverter für unstrukturierte Daten werden generell nicht unterstützt. Beispiel:

  • org.apache.kafka.connect.converters.ByteArrayConverter

  • org.apache.kafka.connect.storage.StringConverter

Einige kundenspezifische Datenkonverter werden möglicherweise nicht unterstützt. Wenden Sie sich an den Snowflake-Support, wenn Sie Hilfe benötigen.

Nutzungshinweise

  • Die Schemaerkennung mit dem Kafka-Konnektor wird mit und ohne einer bereitgestellten Schemaregistrierung unterstützt. Wenn Sie die Schemaregistrierung (Avro und Protobuf) verwenden, wird die Spalte mit den Datentypen erstellt, die in der bereitgestellten Schemaregistrierung definiert sind. Wenn es kein Schemaregister (JSON) gibt, wird der Datentyp von den bereitgestellten Daten abgeleitet.

  • Die Schemaentwicklung mit dem Kafka-Konnektor unterstützt die folgenden Änderungen an Tabellenspalten:

    • Hinzufügen neuer Spalten

    • Löschen der NOT NULL-Einschränkung, wenn die Quelldatenspalte fehlt.

  • Wenn der Kafka-Konnektor die Zieltabelle erstellt, ist die Schemaentwicklung standardmäßig aktiviert. Wenn jedoch die Schemaentwicklung für eine bestehende Tabelle deaktiviert ist, versucht der Kafka-Konnektor, die Zeilen mit den nicht übereinstimmenden Schemas an die konfigurierten Dead-Letter-Queues (DLQ) zu senden.

Beispiele

In den folgenden Beispielen werden die Tabellen gezeigt, die erstellt werden, bevor und nachdem Schemaerkennung und Schemaentwicklung für den Kafka-Konnektor mit Snowpipe Streaming aktiviert wurden.

-- Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates.
+------+---------------------------------------------------------+---------------------------------------------------+
| Row  | RECORD_METADATA                                         | RECORD_CONTENT                                    |
|------+---------------------------------------------------------+---------------------------------------------------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZABZX", "side":...|
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZTEST", "side":...|
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZABZX", "side":...|
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
+------+---------------------------------------------------------+---------------------------------------------------|

-- After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the Kafka connector.
+------+---------------------------------------------------------+---------+--------+-------+----------+
| Row  | RECORD_METADATA                                         | ACCOUNT | SYMBOL | SIDE  | QUANTITY |
|------+---------------------------------------------------------+---------+--------+-------+----------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 3572     |
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| XYZ789  | ZABZX  | SELL  | 3024     |
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| XYZ789  | ZTEST  | SELL  | 799      |
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| ABC123  | ZABZX  | BUY   | 2033     |
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 1558     |
+------+---------------------------------------------------------+---------+--------+-------+----------|
Copy