DLQ とメタデータを使用したApache Kafka

注釈

コネクタには、 コネクタ利用規約 が適用されます。

このトピックでは、 DLQ とメタデータコネクタを使用したApache Kafkaについて説明します。これは、Kafka用のレガシーSnowflakeコネクタと同等の機能を提供し、本番ユースケース向けの高度な機能を含む、フル機能のコネクタです。

主要な機能

DLQ とメタデータコネクタを使用したApache Kafkaは包括的な機能を提供します。

  • デッドレターキュー(DLQ) による失敗メッセージ処理のサポート

  • Kafkaメッセージのメタデータがある RECORD_METADATA

  • 構成可能なスキーマ化 - スキーマ検出の有効化または無効化

  • スキーマが進化する Icebergテーブルのサポート

  • 複数のメッセージ形式 - JSON および AVRO のサポート

  • AVRO メッセージ用の スキーマレジストリ統合

  • 高度なパターンがある トピックからテーブルへのマッピング

  • SASL 認証 のサポート

固有パラメーター

Openflow Connector for Kafkaを設定する で説明されている一般的なパラメーターに加えて、このコネクタには高度な機能のための追加のパラメーターコンテキストが含まれます。

メッセージ形式とスキーマパラメーター

パラメーター

説明

必須

メッセージ形式

Kafkaでのメッセージ形式。 JSON / AVRO のいずれかです。デフォルト: JSON

有り

AVRO スキーマ

AVRO メッセージ形式による AVRO Schema Access Strategyで schema-text-property が使用される場合のAvroスキーマ。注: これは、構成されたKafkaトピックから消費されるすべてのメッセージが同じスキーマを共有する場合にのみ使用する必要があります。

無し

AVRO Schema Access Strategy

メッセージの AVRO スキーマにアクセスする方法。 AVRO には必要です。 embedded-avro-schema / schema-reference-reader / schema-text-property のいずれかです。デフォルト: embedded-avro-schema

無し

スキーマレジストリのパラメーター

パラメーター

説明

必須

スキーマレジストリの認証タイプ

スキーマレジストリを使用する場合の認証方法。それ以外の場合は、 NONE を使用します。 NONE / BASIC のいずれかです。デフォルト: NONE

有り

スキーマレジストリ URL

スキーマレジストリの URL。 AVRO メッセージ形式には必要です。

無し

Schema Registry Username

スキーマレジストリのユーザー名。 AVRO メッセージ形式には必要です。

無し

Schema Registry Password

スキーマレジストリのパスワード。 AVRO メッセージ形式には必要です。

無し

DLQ および高度な機能のパラメーター

パラメーター

説明

必須

Kafka DLQ Topic

解析エラーのメッセージ送信先の DLQ トピック。

有り

スキーマ化有効

データを個々の列に挿入するか、単一の RECORD_CONTENT フィールドに挿入するかを決定します。 true / false のいずれかです。デフォルト: true

有り

Iceberg Enabled

プロセッサーがデータをIcebergテーブルに取り込むかどうかを指定します。このプロパティが実際のテーブルタイプと一致しない場合、プロセッサーは失敗します。デフォルト: false

有り

スキーマ化の動作

コネクタの動作は、 Schematization Enabled パラメーターに基づいて変更されます。

スキーマ化有効

スキーマ化が有効な場合、コネクタは以下のようになります。

  • メッセージの各フィールドに対して個別の列を作成する

  • Kafkaメタデータを含む RECORD_METADATA 列を含む

  • 新しいフィールドが検出されると、テーブルスキーマを自動的に進化させる

  • ネストした JSON/AVRO 構造を個別の列にフラット化する

テーブル構造の例:

RECORD_METADATA

ACCOUNT

SYMBOL

SIDE

QUANTITY

1

{"timestamp":1669074170090, "headers": {"current.iter...

ABC123

ZTEST

BUY

3572

2

{"timestamp":1669074170400, "headers":{"current.iter...

XYZ789

ZABX

SELL

3024

スキーマ化無効

スキーマ化が無効の場合、コネクタは以下のようになります。

  • RECORD_CONTENTRECORD_METADATA の2列のみを作成する

  • メッセージコンテンツ全体を RECORD_CONTENT に OBJECT として格納する

  • スキーマを自動的に進化させない

  • ダウンストリーム処理に最大限の柔軟性を提供する

テーブル構造の例:

RECORD_METADATA

RECORD_CONTENT

1

{"timestamp":1669074170090, "headers": {"current.iter...

{"account": "ABC123", "symbol": "ZTEST", "side":...

2

{"timestamp":1669074170400, "headers":{"current.iter...

{"account": "XYZ789", "symbol": "ZABX", "side":...

スキーマ検出を有効または無効にするには、コネクタ構成プロパティで Schematization Enabled プロパティを使用します。

スキーマの検出と進化

コネクタはスキーマの検出と進化をサポートしています。Snowflakeのテーブルの構造は、コネクタによってロードされた新しいデータの構造をサポートするように定義し、自動的に進化させることができます。

スキーマ検出と進化を行わない場合、コネクタによってロードされたSnowflakeテーブルは、 RECORD_CONTENTRECORD_METADATA の2つの OBJECT 列のみで構成されます。

スキーマ検出と進化がある場合、Snowflakeはストリーミングデータのスキーマを検出し、ユーザー定義スキーマに自動的に一致するテーブルにデータをロードします。Snowflakeでは、新しい列を追加したり、新しいデータファイルで欠落している列から NOT NULL 制約をドロップしたりすることもできます。

コネクタによるスキーマ検出は、提供されたスキーマレジストリの有無にかかわらずサポートされています。スキーマレジストリ(Avro)を使用する場合、列はスキーマレジストリで定義されたデータ型で作成されます。スキーマレジストリ(JSON)がない場合、データ型は提供されるデータに基づいて推測されます。

それ以上のスキーマ化ではJSONARRAYはサポートされていません。

スキーマ進化の有効化

コネクタがターゲットテーブルを作成する場合、スキーマ進化はデフォルトで有効になります。

既存のテーブルでスキーマ進化を有効または無効にしたい場合は、 ALTER TABLE コマンドを使用して、 ENABLE_SCHEMA_EVOLUTION パラメーターを設定します。テーブルで OWNERSHIP 権限を持つロールを使用することも必要です。詳細については、 テーブルスキーマの進化 をご参照ください。

ただし、既存のテーブルに対してスキーマの進化が無効になっている場合、コネクタは不一致のスキーマがある行を構成されたデッドレターキュー(DLQ)に送ろうとします。

RECORD_METADATA 構造

RECORD_METADATA 列には、重要なKafkaメッセージメタデータが含まれます。

フィールド

説明

offset

Kafkaパーティション内のメッセージオフセット

topic

Kafkaトピック名

partition

Kafkaパーティション番号

key

メッセージキー(存在する場合)

timestamp

メッセージのタイムスタンプ

SnowflakeConnectorPushTime

コネクタがKafkaからメッセージをフェッチした時点のタイムスタンプ

headers

メッセージヘッダーのマップ(存在する場合)

デッドレターキュー(DLQ)

DLQ 機能は、正常に処理できないメッセージを処理します。

DLQ 動作

  • 解析失敗 - JSON/AVRO 形式が無効なメッセージが DLQ に送信されます。

  • スキーマの不一致 - スキーマの進化が無効な場合に、期待されるスキーマと一致しないメッセージ。

  • 処理エラー - 取り込み中のその他の処理エラー

Icebergテーブルのサポート

Openflow Connector for Kafkaは、 Iceberg Enabledtrue に設定されている場合、Snowflake管理の Apache Iceberg™ テーブル にデータを取り込むことができます。

要件と制限

Icebergテーブルの取り込み用にOpenflow Kafkaコネクタを構成する前に、以下の要件と制限に注意してください。

  • コネクタを実行する前にIcebergテーブルを作成する必要があります。

  • ユーザーが作成したテーブルにデータを挿入するアクセス権を持っていることを確認します。

構成とセットアップ

Icebergテーブルの取り込み用にOpenflow Connector for Kafkaを構成するには、 Openflow Connector for Kafkaを設定する の手順に従いますが、以下のセクションで若干の違いがあります。

Icebergテーブルへの取り込みを有効にする

Icebergテーブルへの取り込みを有効にするには、 Iceberg Enabled パラメーターを true に設定する必要があります。

インジェスト用にIcebergテーブルを作成

コネクタを実行する前に、Icebergテーブルを作成する必要があります。初期テーブルスキーマは、コネクタの Schematization Enabled プロパティ設定に依存します。

スキーマ化を有効にする場合は、 record_metadata という列を持つテーブルを作成する必要があります。

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

コネクタはメッセージフィールドの列を自動的に作成し、 record_metadata 列スキーマを変更します。

スキーマ化を有効にしない場合は、 record_content という列を持つテーブルを、実際のKafkaメッセージコンテンツと一致するタイプで作成する必要があります。コネクタは自動的に record_metadata 列を作成します。

Icebergテーブルを作成する場合、Icebergデータタイプまたは 互換性のあるSnowflakeデータタイプ を使用できます。半構造化 VARIANT タイプはサポートされていません。代わりに、 構造化された OBJECT または MAP を使用します。

例えば、次のようなメッセージを考えてみましょう:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

Icebergテーブル作成の例

スキーマ化を有効にした場合:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

スキーマ化を無効にした場合:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    RECORD_CONTENT OBJECT(
        id INT,
        body_temperature FLOAT,
        name STRING,
        approved_coffee_types ARRAY(STRING),
        animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
        date_added DATE
    )
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

注釈

RECORD_METADATA は常に作成されなければなりません。 dogscats のような入れ子構造内のフィールド名では、大文字と小文字が区別されます。

ユースケース

このコネクタは次のようなケースに最適です。

  • DLQ が必要な 本番環境

  • Kafkaメタデータが重要な データ系統と監査

  • スキーマ進化要件のある 複雑なメッセージ処理

  • Icebergテーブル統合

メタデータや DLQ 機能のないよりシンプルな取り込みが必要な場合は、代わりに JSON/AVRO データ形式用Apache Kafka コネクタを検討してください。