DLQ とメタデータを使用したApache Kafka¶
注釈
コネクタには、 コネクタ利用規約 が適用されます。
このトピックでは、 DLQ とメタデータコネクタを使用したApache Kafkaについて説明します。これは、Kafka用のレガシーSnowflakeコネクタと同等の機能を提供し、本番ユースケース向けの高度な機能を含む、フル機能のコネクタです。
主要な機能¶
DLQ とメタデータコネクタを使用したApache Kafkaは包括的な機能を提供します。
デッドレターキュー(DLQ) による失敗メッセージ処理のサポート
Kafkaメッセージのメタデータがある RECORD_METADATA 列
構成可能なスキーマ化 - スキーマ検出の有効化または無効化
スキーマが進化する Icebergテーブルのサポート
複数のメッセージ形式 - JSON および AVRO のサポート
AVRO メッセージ用の スキーマレジストリ統合
高度なパターンがある トピックからテーブルへのマッピング
SASL 認証 のサポート
固有パラメーター¶
Openflow Connector for Kafkaを設定する で説明されている一般的なパラメーターに加えて、このコネクタには高度な機能のための追加のパラメーターコンテキストが含まれます。
メッセージ形式とスキーマパラメーター¶
パラメーター |
説明 |
必須 |
---|---|---|
メッセージ形式 |
Kafkaでのメッセージ形式。 JSON / AVRO のいずれかです。デフォルト: JSON |
有り |
AVRO スキーマ |
AVRO メッセージ形式による AVRO Schema Access Strategyで schema-text-property が使用される場合のAvroスキーマ。注: これは、構成されたKafkaトピックから消費されるすべてのメッセージが同じスキーマを共有する場合にのみ使用する必要があります。 |
無し |
AVRO Schema Access Strategy |
メッセージの AVRO スキーマにアクセスする方法。 AVRO には必要です。 embedded-avro-schema / schema-reference-reader / schema-text-property のいずれかです。デフォルト: embedded-avro-schema |
無し |
スキーマレジストリのパラメーター¶
パラメーター |
説明 |
必須 |
---|---|---|
スキーマレジストリの認証タイプ |
スキーマレジストリを使用する場合の認証方法。それ以外の場合は、 NONE を使用します。 NONE / BASIC のいずれかです。デフォルト: NONE |
有り |
スキーマレジストリ URL |
スキーマレジストリの URL。 AVRO メッセージ形式には必要です。 |
無し |
Schema Registry Username |
スキーマレジストリのユーザー名。 AVRO メッセージ形式には必要です。 |
無し |
Schema Registry Password |
スキーマレジストリのパスワード。 AVRO メッセージ形式には必要です。 |
無し |
DLQ および高度な機能のパラメーター¶
パラメーター |
説明 |
必須 |
---|---|---|
Kafka DLQ Topic |
解析エラーのメッセージ送信先の DLQ トピック。 |
有り |
スキーマ化有効 |
データを個々の列に挿入するか、単一の RECORD_CONTENT フィールドに挿入するかを決定します。 true / false のいずれかです。デフォルト: true |
有り |
Iceberg Enabled |
プロセッサーがデータをIcebergテーブルに取り込むかどうかを指定します。このプロパティが実際のテーブルタイプと一致しない場合、プロセッサーは失敗します。デフォルト: false |
有り |
スキーマ化の動作¶
コネクタの動作は、 Schematization Enabled パラメーターに基づいて変更されます。
スキーマ化有効¶
スキーマ化が有効な場合、コネクタは以下のようになります。
メッセージの各フィールドに対して個別の列を作成する
Kafkaメタデータを含む RECORD_METADATA 列を含む
新しいフィールドが検出されると、テーブルスキーマを自動的に進化させる
ネストした JSON/AVRO 構造を個別の列にフラット化する
テーブル構造の例:
行 |
RECORD_METADATA |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
---|---|---|---|---|---|
1 |
{"timestamp":1669074170090, "headers": {"current.iter... |
ABC123 |
ZTEST |
BUY |
3572 |
2 |
{"timestamp":1669074170400, "headers":{"current.iter... |
XYZ789 |
ZABX |
SELL |
3024 |
スキーマ化無効¶
スキーマ化が無効の場合、コネクタは以下のようになります。
RECORD_CONTENT と RECORD_METADATA の2列のみを作成する
メッセージコンテンツ全体を RECORD_CONTENT に OBJECT として格納する
スキーマを自動的に進化させない
ダウンストリーム処理に最大限の柔軟性を提供する
テーブル構造の例:
行 |
RECORD_METADATA |
RECORD_CONTENT |
---|---|---|
1 |
{"timestamp":1669074170090, "headers": {"current.iter... |
{"account": "ABC123", "symbol": "ZTEST", "side":... |
2 |
{"timestamp":1669074170400, "headers":{"current.iter... |
{"account": "XYZ789", "symbol": "ZABX", "side":... |
スキーマ検出を有効または無効にするには、コネクタ構成プロパティで Schematization Enabled
プロパティを使用します。
スキーマの検出と進化¶
コネクタはスキーマの検出と進化をサポートしています。Snowflakeのテーブルの構造は、コネクタによってロードされた新しいデータの構造をサポートするように定義し、自動的に進化させることができます。
スキーマ検出と進化を行わない場合、コネクタによってロードされたSnowflakeテーブルは、 RECORD_CONTENT
と RECORD_METADATA
の2つの OBJECT
列のみで構成されます。
スキーマ検出と進化がある場合、Snowflakeはストリーミングデータのスキーマを検出し、ユーザー定義スキーマに自動的に一致するテーブルにデータをロードします。Snowflakeでは、新しい列を追加したり、新しいデータファイルで欠落している列から NOT NULL
制約をドロップしたりすることもできます。
コネクタによるスキーマ検出は、提供されたスキーマレジストリの有無にかかわらずサポートされています。スキーマレジストリ(Avro)を使用する場合、列はスキーマレジストリで定義されたデータ型で作成されます。スキーマレジストリ(JSON)がない場合、データ型は提供されるデータに基づいて推測されます。
それ以上のスキーマ化ではJSONARRAYはサポートされていません。
スキーマ進化の有効化¶
コネクタがターゲットテーブルを作成する場合、スキーマ進化はデフォルトで有効になります。
既存のテーブルでスキーマ進化を有効または無効にしたい場合は、 ALTER TABLE コマンドを使用して、 ENABLE_SCHEMA_EVOLUTION
パラメーターを設定します。テーブルで OWNERSHIP
権限を持つロールを使用することも必要です。詳細については、 テーブルスキーマの進化 をご参照ください。
ただし、既存のテーブルに対してスキーマの進化が無効になっている場合、コネクタは不一致のスキーマがある行を構成されたデッドレターキュー(DLQ)に送ろうとします。
RECORD_METADATA 構造¶
RECORD_METADATA 列には、重要なKafkaメッセージメタデータが含まれます。
フィールド |
説明 |
---|---|
offset |
Kafkaパーティション内のメッセージオフセット |
topic |
Kafkaトピック名 |
partition |
Kafkaパーティション番号 |
key |
メッセージキー(存在する場合) |
timestamp |
メッセージのタイムスタンプ |
SnowflakeConnectorPushTime |
コネクタがKafkaからメッセージをフェッチした時点のタイムスタンプ |
headers |
メッセージヘッダーのマップ(存在する場合) |
デッドレターキュー(DLQ)¶
DLQ 機能は、正常に処理できないメッセージを処理します。
DLQ 動作¶
解析失敗 - JSON/AVRO 形式が無効なメッセージが DLQ に送信されます。
スキーマの不一致 - スキーマの進化が無効な場合に、期待されるスキーマと一致しないメッセージ。
処理エラー - 取り込み中のその他の処理エラー
Icebergテーブルのサポート¶
Openflow Connector for Kafkaは、 Iceberg Enabled が true に設定されている場合、Snowflake管理の Apache Iceberg™ テーブル にデータを取り込むことができます。
要件と制限¶
Icebergテーブルの取り込み用にOpenflow Kafkaコネクタを構成する前に、以下の要件と制限に注意してください。
コネクタを実行する前にIcebergテーブルを作成する必要があります。
ユーザーが作成したテーブルにデータを挿入するアクセス権を持っていることを確認します。
構成とセットアップ¶
Icebergテーブルの取り込み用にOpenflow Connector for Kafkaを構成するには、 Openflow Connector for Kafkaを設定する の手順に従いますが、以下のセクションで若干の違いがあります。
Icebergテーブルへの取り込みを有効にする¶
Icebergテーブルへの取り込みを有効にするには、 Iceberg Enabled
パラメーターを true
に設定する必要があります。
インジェスト用にIcebergテーブルを作成¶
コネクタを実行する前に、Icebergテーブルを作成する必要があります。初期テーブルスキーマは、コネクタの Schematization Enabled
プロパティ設定に依存します。
スキーマ化を有効にする場合は、 record_metadata
という列を持つテーブルを作成する必要があります。
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
record_metadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
コネクタはメッセージフィールドの列を自動的に作成し、 record_metadata
列スキーマを変更します。
スキーマ化を有効にしない場合は、 record_content
という列を持つテーブルを、実際のKafkaメッセージコンテンツと一致するタイプで作成する必要があります。コネクタは自動的に record_metadata
列を作成します。
Icebergテーブルを作成する場合、Icebergデータタイプまたは 互換性のあるSnowflakeデータタイプ を使用できます。半構造化 VARIANT タイプはサポートされていません。代わりに、 構造化された OBJECT または MAP を使用します。
例えば、次のようなメッセージを考えてみましょう:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
Icebergテーブル作成の例¶
スキーマ化を有効にした場合:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
RECORD_METADATA OBJECT(
offset INTEGER,
topic STRING,
partition INTEGER,
key STRING,
timestamp TIMESTAMP,
SnowflakeConnectorPushTime BIGINT,
headers MAP(VARCHAR, VARCHAR)
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
スキーマ化を無効にした場合:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
RECORD_METADATA OBJECT(
offset INTEGER,
topic STRING,
partition INTEGER,
key STRING,
timestamp TIMESTAMP,
SnowflakeConnectorPushTime BIGINT,
headers MAP(VARCHAR, VARCHAR)
),
RECORD_CONTENT OBJECT(
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
注釈
RECORD_METADATA は常に作成されなければなりません。 dogs
や cats
のような入れ子構造内のフィールド名では、大文字と小文字が区別されます。
ユースケース¶
このコネクタは次のようなケースに最適です。
DLQ が必要な 本番環境
Kafkaメタデータが重要な データ系統と監査
スキーマ進化要件のある 複雑なメッセージ処理
Icebergテーブル統合
メタデータや DLQ 機能のないよりシンプルな取り込みが必要な場合は、代わりに JSON/AVRO データ形式用Apache Kafka コネクタを検討してください。