Kafka用Snowflake Connectorと Apache Iceberg™ テーブルを使用する

バージョン3.0.0より、Snowflake Connector for KafkaはSnowflakeが管理する Apache Iceberg™テーブル にデータを取り込むことができます。

要件と制限

Icebergテーブルへのデータ取り込みにKafkaコネクタを構成する前に、以下の要件と制限に注意してください:

  • Icebergテーブルにデータを取り込むには、バージョン3.0.0以降のKafkaコネクタが必要です。

  • Icebergテーブルへの取り込みは、Snowpipe StreamingのKafkaコネクタでサポートされています。SnowpipeのKafkaコネクタではサポートされていません。

  • snowflake.streaming.enable.single.bufferfalse にセットされている場合、Icebergテーブルへの取り込みはサポートされません。

  • コネクタを実行する前にIcebergテーブルを作成する必要があります。詳細については、 構成とセットアップ (このトピック内)をご参照ください。

スキーマ進化の制限

Icebergのスキーマ進化は、 AVRO やProtobufなどのスキーマ化されたデータ形式を完全にサポートしています。

スキーマのないプレーンな JSON の場合、コネクタは以下のメッセージタイプを無効とみなし、配信不能キュー (DLQ)に送ります:

  • 対応する値が null または `[]`の場合、新しい列を持つメッセージ

  • 対応する値が null または `[]`の場合、構造化オブジェクトに新しいフィールドを持つメッセージ。

コネクタがこれらのメッセージタイプを取り込めるようにテーブルスキーマを手動で変更するには、 ALTER TABLE ステートメントを使用します。

構成とセットアップ

Icebergテーブルインジェスト用にKafkaコネクタを構成するには、 Snowpipe Streamingベースのコネクタの通常のセットアップ手順 に従いますが、その際の注意点を以降のセクションで説明します。

外部ボリュームの使用許可

Kafka コネクタのロールに、Icebergテーブルに関連付けられた外部ボリュームの USAGE 権限を付与する必要があります。

たとえば、Icebergテーブルが kafka_external_volume 外部ボリュームを使用し、コネクタがロール kafka_connector_role を使用する場合、次のステートメントを実行します:

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
Copy

インジェスト用にIcebergテーブルを作成

コネクタを実行する前に、Icebergテーブルを作成する必要があります。初期テーブルスキーマは、コネクタ Snowflake.enable.schematization の設定に依存します。

スキーマ化を有効にすると、 record_metadata という列を持つテーブルを作成できます:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

コネクタは自動的に record_content 列を作成し、 record_metadata 列スキーマを変更します。

スキーマ化を有効にしない場合は、実際のKafkaメッセージの内容に一致するタイプの record_content という列を持つテーブルを作成できます。コネクタは自動的に record_metadata 列を作成します。

Icebergテーブルを作成する場合、Icebergデータタイプまたは 互換性のあるSnowflakeデータタイプ を使用できます。半構造化 VARIANT タイプはサポートされていません。代わりに、 構造化された OBJECT または MAP を使用してください。

例えば、次のようなメッセージを考えてみましょう:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

例のメッセージ用にIcebergテーブルを作成するには、以下のステートメントを使用します:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_content OBJECT(
        id INT,
        body_temperature FLOAT,
        name STRING,
        approved_coffee_types ARRAY(STRING),
        animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
        date_added DATE
    )
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

注釈

dogscats のような入れ子構造内のフィールド名では、大文字と小文字が区別されます。

構成プロパティ

snowflake.streaming.iceberg.enabled

コネクタがデータをIcebergテーブルに取り込むかどうかを指定します。このプロパティが実際のテーブルタイプと一致しない場合、コネクタは失敗します。

:

  • true

  • false

デフォルト:

false