Kafka用Snowflake Connectorと Apache Iceberg™ テーブルを使用する¶
バージョン3.0.0より、Snowflake Connector for KafkaはSnowflakeが管理する Apache Iceberg™テーブル にデータを取り込むことができます。
要件と制限¶
Icebergテーブルへのデータ取り込みにKafkaコネクタを構成する前に、以下の要件と制限に注意してください:
Icebergテーブルにデータを取り込むには、バージョン3.0.0以降のKafkaコネクタが必要です。
Icebergテーブルへの取り込みは、Snowpipe StreamingのKafkaコネクタでサポートされています。SnowpipeのKafkaコネクタではサポートされていません。
snowflake.streaming.enable.single.bufferがfalseにセットされている場合、Icebergテーブルへの取り込みはサポートされません。コネクタを実行する前にIcebergテーブルを作成する必要があります。詳細については、 構成とセットアップ (このトピック内)をご参照ください。
スキーマ進化の制限¶
Icebergのスキーマ進化は、 AVRO やProtobufなどのスキーマ化されたデータ形式を完全にサポートしています。
スキーマのないプレーンな JSON の場合、コネクタは以下のメッセージタイプを無効とみなし、配信不能キュー (DLQ)に送ります:
コネクタがこれらのメッセージタイプを取り込めるようにテーブルスキーマを手動で変更するには、 ALTER TABLE ステートメントを使用します。
構成とセットアップ¶
Icebergテーブルインジェスト用にKafkaコネクタを構成するには、 Snowpipe Streamingベースのコネクタの通常のセットアップ手順 に従いますが、その際の注意点を以降のセクションで説明します。
外部ボリュームの使用許可¶
Kafka コネクタのロールに、Icebergテーブルに関連付けられた外部ボリュームの USAGE 権限を付与する必要があります。
たとえば、Icebergテーブルが kafka_external_volume 外部ボリュームを使用し、コネクタがロール kafka_connector_role を使用する場合、次のステートメントを実行します:
インジェスト用にIcebergテーブルを作成¶
コネクタを実行する前に、Icebergテーブルを作成する必要があります。初期テーブルスキーマは、コネクタ Snowflake.enable.schematization の設定に依存します。
スキーマ化を有効にすると、 record_metadata という列を持つテーブルを作成できます:
コネクタはメッセージフィールドの列を自動的に作成し、 record_metadata 列スキーマを変更します。
スキーマ化を有効にしない場合は、実際のKafkaメッセージの内容に一致するタイプの record_content という列を持つテーブルを作成できます。コネクタは自動的に record_metadata 列を作成します。
Icebergテーブルを作成する場合、Icebergデータタイプまたは 互換性のあるSnowflakeデータタイプ を使用できます。半構造化 VARIANT タイプはサポートされていません。代わりに、 構造化された OBJECT または MAP を使用してください。
例えば、次のようなメッセージを考えてみましょう:
例のメッセージ用にIcebergテーブルを作成するには、以下のステートメントを使用します:
注釈
dogs や cats のような入れ子構造内のフィールド名では、大文字と小文字が区別されます。
構成プロパティ¶
snowflake.streaming.iceberg.enabledコネクタがデータをIcebergテーブルに取り込むかどうかを指定します。このプロパティが実際のテーブルタイプと一致しない場合、コネクタは失敗します。
- 値:
truefalse
- デフォルト:
false