Kafka用Snowflake Connectorと Apache Iceberg™ テーブルを使用する¶
バージョン3.0.0より、Snowflake Connector for KafkaはSnowflakeが管理する Apache Iceberg™テーブル にデータを取り込むことができます。
要件と制限¶
Icebergテーブルへのデータ取り込みにKafkaコネクタを構成する前に、以下の要件と制限に注意してください:
Icebergテーブルにデータを取り込むには、バージョン3.0.0以降のKafkaコネクタが必要です。
Icebergテーブルへの取り込みは、Snowpipe StreamingのKafkaコネクタでサポートされています。SnowpipeのKafkaコネクタではサポートされていません。
snowflake.streaming.enable.single.buffer
がfalse
にセットされている場合、Icebergテーブルへの取り込みはサポートされません。コネクタを実行する前にIcebergテーブルを作成する必要があります。詳細については、 構成とセットアップ (このトピック内)をご参照ください。
スキーマ進化の制限¶
Icebergのスキーマ進化は、 AVRO やProtobufなどのスキーマ化されたデータ形式を完全にサポートしています。
スキーマのないプレーンな JSON の場合、コネクタは以下のメッセージタイプを無効とみなし、配信不能キュー (DLQ)に送ります:
コネクタがこれらのメッセージタイプを取り込めるようにテーブルスキーマを手動で変更するには、 ALTER TABLE ステートメントを使用します。
構成とセットアップ¶
Icebergテーブルインジェスト用にKafkaコネクタを構成するには、 Snowpipe Streamingベースのコネクタの通常のセットアップ手順 に従いますが、その際の注意点を以降のセクションで説明します。
外部ボリュームの使用許可¶
Kafka コネクタのロールに、Icebergテーブルに関連付けられた外部ボリュームの USAGE 権限を付与する必要があります。
たとえば、Icebergテーブルが kafka_external_volume
外部ボリュームを使用し、コネクタがロール kafka_connector_role
を使用する場合、次のステートメントを実行します:
USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
インジェスト用にIcebergテーブルを作成¶
コネクタを実行する前に、Icebergテーブルを作成する必要があります。初期テーブルスキーマは、コネクタ Snowflake.enable.schematization
の設定に依存します。
スキーマ化を有効にすると、 record_metadata
という列を持つテーブルを作成できます:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
record_metadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
コネクタは自動的に record_content
列を作成し、 record_metadata
列スキーマを変更します。
スキーマ化を有効にしない場合は、実際のKafkaメッセージの内容に一致するタイプの record_content
という列を持つテーブルを作成できます。コネクタは自動的に record_metadata
列を作成します。
Icebergテーブルを作成する場合、Icebergデータタイプまたは 互換性のあるSnowflakeデータタイプ を使用できます。半構造化 VARIANT タイプはサポートされていません。代わりに、 構造化された OBJECT または MAP を使用してください。
例えば、次のようなメッセージを考えてみましょう:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
例のメッセージ用にIcebergテーブルを作成するには、以下のステートメントを使用します:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( record_content OBJECT( id INT, body_temperature FLOAT, name STRING, approved_coffee_types ARRAY(STRING), animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN), date_added DATE ) ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table';
注釈
dogs
や cats
のような入れ子構造内のフィールド名では、大文字と小文字が区別されます。
構成プロパティ¶
snowflake.streaming.iceberg.enabled
コネクタがデータをIcebergテーブルに取り込むかどうかを指定します。このプロパティが実際のテーブルタイプと一致しない場合、コネクタは失敗します。
- 値:
true
false
- デフォルト:
false