Snowpipe Streamingを使用したKafkaコネクタのスキーマ検出および進化

Snowpipe Streamingを使用したKafkaコネクタは、スキーマ検出および進化をサポートしています。Snowflakeのテーブルの構造を自動的に定義して進化させ、Kafkaコネクタによりロードされた新しいSnowpipe Streamingデータの構造をサポートすることができます。

スキーマ検出と進化がない場合、KafkaコネクタによってロードされたSnowflakeテーブルは、 RECORD_CONTENT と RECORD_METADATA の2つの列(VARIANT)のみで構成されます。スキーマ検出と進化がある場合、Snowflakeはストリーミングデータのスキーマを検出し、ユーザー定義スキーマに自動的に一致するテーブルにデータをロードします。Snowflakeでは、新しい列を追加したり、新しいデータファイルで欠落している列から NOT NULL 制約をドロップしたりすることもできます。

注釈

このプレビュー機能は、Snowpipe Streamingを使用したKafkaコネクタでのみ動作します。現在のところ、ファイルベースのSnowpipeを使用したKafkaコネクタはサポートしていません。

このトピックの内容:

前提条件

この機能を有効にする前に、以下の前提条件を設定してください。

  • Kafkaコネクタバージョン2.0.0以降をダウンロードします。詳細については、 Kafkaコネクタのインストールと構成 をご参照ください。

  • ALTER TABLE コマンドを使用して、テーブルで ENABLE_SCHEMA_EVOLUTION パラメーターを TRUE に設定します。テーブルに対する EVOLVE SCHEMA 権限を持つロールを使用することも必要です。詳細については、 テーブルスキーマの進化 をご参照ください。

必要なKafkaプロパティの構成

Kafkaコネクタのプロパティファイルで、以下の必要なプロパティを構成します。

snowflake.ingestion.method

SNOWPIPE_STREAMING を使用してKafkaトピックデータをロードするように指定します。なお、このプレビュー機能は現在、 SNOWPIPE をサポートしていません。

snowflake.enable.schematization

TRUE を指定して、Snowpipe Streamingを使用したKafkaコネクタのスキーマ検出および進化を有効にします。デフォルト値は FALSE です。

schema.registry.url

スキーマレジストリサービスの URL に指定します。デフォルト値は空です。

ファイル形式によって、 schema.registry.url は必須またはオプションです。Kafkaコネクタによるスキーマ検出は、以下のいずれかのシナリオでサポートされます。

  • スキーマレジストリはAvroとProtobufで必須です。列はプロバイダーのスキーマレジストリで定義されたデータ型で作成されます。

  • スキーマレジストリは JSON ではオプションです。スキーマレジストリがない場合、データ型は提供されるデータに基づいて推測されます。

通常どおり、Kafkaコネクタのプロパティファイルで追加のプロパティを構成します。詳細については、 Kafkaコネクタの構成 をご参照ください。

コンバーター

JSON、Avro、Protobufなどの構造化データコンバーターはすべてサポートされています。なお、テスト済みの構造化データコンバーターは以下に限られます。

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.protobuf.ProtobufConverter

  • org.apache.kafka.connect.json.JsonConverter

  • io.confluent.connect.json.JsonSchemaConverter

非構造化データコンバーターはサポートされていません。例:

  • org.apache.kafka.connect.converters.ByteArrayConverter

  • org.apache.kafka.connect.storage.StringConverter

一部のカスタマイズされたデータコンバーターには対応していない場合があります。サポートが必要な場合は、Snowflakeサポートにお問い合わせください。

使用上の注意

  • Kafkaコネクタを使用したスキーマ検出は、提供されたスキーマレジストリの有無にかかわらずサポートされています。スキーマレジストリ(Avro および Protobuf)を使用する場合、列は提供されたスキーマレジストリで定義されたデータ型で作成されます。スキーマレジストリ(JSON)がない場合、データ型は提供されるデータに基づいて推測されます。

  • Kafkaコネクタを使用したスキーマ進化は、以下のテーブル列の変更をサポートします。

    • 新しい列の追加

    • ソースデータ列がない場合は、 NOT NULL 制約をドロップします。

  • Kafkaコネクタがターゲットテーブルを作成する場合、スキーマ進化はデフォルトで有効になります。ただし、既存のテーブルでスキーマ進化が無効になっている場合、Kafkaコネクタはスキーマが不一致の行を構成された配信不能キュー(DLQ)に送信しようとします。

以下の例は、Snowpipe Streamingを使用したKafkaコネクタでスキーマ検出と進化を有効にする前と後に作成されるテーブルを示しています。

-- Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates.
+------+---------------------------------------------------------+---------------------------------------------------+
| Row  | RECORD_METADATA                                         | RECORD_CONTENT                                    |
|------+---------------------------------------------------------+---------------------------------------------------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZABZX", "side":...|
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZTEST", "side":...|
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZABZX", "side":...|
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
+------+---------------------------------------------------------+---------------------------------------------------|

-- After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the Kafka connector.
+------+---------------------------------------------------------+---------+--------+-------+----------+
| Row  | RECORD_METADATA                                         | ACCOUNT | SYMBOL | SIDE  | QUANTITY |
|------+---------------------------------------------------------+---------+--------+-------+----------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 3572     |
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| XYZ789  | ZABZX  | SELL  | 3024     |
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| XYZ789  | ZTEST  | SELL  | 799      |
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| ABC123  | ZABZX  | BUY   | 2033     |
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 1558     |
+------+---------------------------------------------------------+---------+--------+-------+----------|
Copy