Snowpipe Streamingを使用したKafkaコネクタのスキーマ検出および進化¶
Snowpipe Streamingを使用したKafkaコネクタは、スキーマ検出および進化をサポートしています。Snowflakeのテーブルの構造を自動的に定義して進化させ、Kafkaコネクタによりロードされた新しいSnowpipe Streamingデータの構造をサポートすることができます。
スキーマ検出と進化がない場合、KafkaコネクタによってロードされたSnowflakeテーブルは、 RECORD_CONTENT と RECORD_METADATA の2つの列(VARIANT)のみで構成されます。スキーマ検出と進化がある場合、Snowflakeはストリーミングデータのスキーマを検出し、ユーザー定義スキーマに自動的に一致するテーブルにデータをロードします。Snowflakeでは、新しい列を追加したり、新しいデータファイルで欠落している列から NOT NULL 制約をドロップしたりすることもできます。
注釈
このプレビュー機能は、Snowpipe Streamingを使用したKafkaコネクタでのみ動作します。現在のところ、ファイルベースのSnowpipeを使用したKafkaコネクタはサポートしていません。
このトピックの内容:
前提条件¶
この機能を有効にする前に、以下の前提条件を設定してください。
Kafkaコネクタバージョン2.0.0以降をダウンロードします。詳細については、 Kafkaコネクタのインストールと構成 をご参照ください。
ALTER TABLE コマンドを使用して、テーブルで
ENABLE_SCHEMA_EVOLUTION
パラメーターを TRUE に設定します。テーブルに対する EVOLVE SCHEMA 権限を持つロールを使用することも必要です。詳細については、 テーブルスキーマの進化 をご参照ください。
必要なKafkaプロパティの構成¶
Kafkaコネクタのプロパティファイルで、以下の必要なプロパティを構成します。
snowflake.ingestion.method
SNOWPIPE_STREAMING
を使用してKafkaトピックデータをロードするように指定します。なお、このプレビュー機能は現在、SNOWPIPE
をサポートしていません。snowflake.enable.schematization
TRUE
を指定して、Snowpipe Streamingを使用したKafkaコネクタのスキーマ検出および進化を有効にします。デフォルト値はFALSE
です。schema.registry.url
スキーマレジストリサービスの URL に指定します。デフォルト値は空です。
ファイル形式によって、
schema.registry.url
は必須またはオプションです。Kafkaコネクタによるスキーマ検出は、以下のいずれかのシナリオでサポートされます。スキーマレジストリはAvroとProtobufで必須です。列はプロバイダーのスキーマレジストリで定義されたデータ型で作成されます。
スキーマレジストリは JSON ではオプションです。スキーマレジストリがない場合、データ型は提供されるデータに基づいて推測されます。
通常どおり、Kafkaコネクタのプロパティファイルで追加のプロパティを構成します。詳細については、 Kafkaコネクタの構成 をご参照ください。
コンバーター¶
JSON、Avro、Protobufなどの構造化データコンバーターはすべてサポートされています。なお、テスト済みの構造化データコンバーターは以下に限られます。
io.confluent.connect.avro.AvroConverter
io.confluent.connect.protobuf.ProtobufConverter
org.apache.kafka.connect.json.JsonConverter
io.confluent.connect.json.JsonSchemaConverter
非構造化データコンバーターはサポートされていません。例:
org.apache.kafka.connect.converters.ByteArrayConverter
org.apache.kafka.connect.storage.StringConverter
一部のカスタマイズされたデータコンバーターには対応していない場合があります。サポートが必要な場合は、Snowflakeサポートにお問い合わせください。
使用上の注意¶
Kafkaコネクタを使用したスキーマ検出は、提供されたスキーマレジストリの有無にかかわらずサポートされています。スキーマレジストリ(Avro および Protobuf)を使用する場合、列は提供されたスキーマレジストリで定義されたデータ型で作成されます。スキーマレジストリ(JSON)がない場合、データ型は提供されるデータに基づいて推測されます。
Kafkaコネクタを使用したスキーマ進化は、以下のテーブル列の変更をサポートします。
新しい列の追加
ソースデータ列がない場合は、 NOT NULL 制約をドロップします。
Kafkaコネクタがターゲットテーブルを作成する場合、スキーマ進化はデフォルトで有効になります。ただし、既存のテーブルでスキーマ進化が無効になっている場合、Kafkaコネクタはスキーマが不一致の行を構成された配信不能キュー(DLQ)に送信しようとします。
例¶
以下の例は、Snowpipe Streamingを使用したKafkaコネクタでスキーマ検出と進化を有効にする前と後に作成されるテーブルを示しています。
-- Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates. +------+---------------------------------------------------------+---------------------------------------------------+ | Row | RECORD_METADATA | RECORD_CONTENT | |------+---------------------------------------------------------+---------------------------------------------------| | 1 |{"CreateTime":1669074170090, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...| | 2 |{"CreateTime":1669074170400, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZABZX", "side":...| | 3 |{"CreateTime":1669074170659, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZTEST", "side":...| | 4 |{"CreateTime":1669074170904, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZABZX", "side":...| | 5 |{"CreateTime":1669074171063, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...| +------+---------------------------------------------------------+---------------------------------------------------| -- After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the Kafka connector. +------+---------------------------------------------------------+---------+--------+-------+----------+ | Row | RECORD_METADATA | ACCOUNT | SYMBOL | SIDE | QUANTITY | |------+---------------------------------------------------------+---------+--------+-------+----------| | 1 |{"CreateTime":1669074170090, "headers": {"current.iter...| ABC123 | ZTEST | BUY | 3572 | | 2 |{"CreateTime":1669074170400, "headers": {"current.iter...| XYZ789 | ZABZX | SELL | 3024 | | 3 |{"CreateTime":1669074170659, "headers": {"current.iter...| XYZ789 | ZTEST | SELL | 799 | | 4 |{"CreateTime":1669074170904, "headers": {"current.iter...| ABC123 | ZABZX | BUY | 2033 | | 5 |{"CreateTime":1669074171063, "headers": {"current.iter...| ABC123 | ZTEST | BUY | 1558 | +------+---------------------------------------------------------+---------+--------+-------+----------|