Kafkaコネクタのトラブルシューティング¶
このセクションでは、Kafkaコネクタを使用してデータを取り込む際に発生した問題のトラブルシューティング方法について説明します。
このトピックの内容:
エラー通知¶
Snowpipeのエラー通知を構成します。読み込み中にSnowpipeでファイルエラーが発生すると、この機能は設定されたクラウドメッセージングサービスに通知をプッシュし、データファイルの分析を可能にします。詳細については、 Snowpipeのエラー通知 をご参照ください。
一般的なトラブルシューティングステップ¶
Kafkaコネクタを使用したロードの問題をトラブルシューティングするには、次のステップを実行します。
ステップ1: テーブルの COPY 履歴を表示する¶
ターゲットテーブルのロードアクティビティ履歴をクエリします。詳細については、 COPY_HISTORY ビュー をご参照ください。 COPY_HISTORY 出力に予想されるファイルのセットが含まれていない場合は、以前の期間をクエリします。ファイルが以前のファイルの複製であった場合は、元のファイルをロードしようとしたときに、ロード履歴にアクティビティが記録されていた可能性があります。 STATUS
列は、特定のファイルセットがロードされたか、部分的にロードされたか、またはロードに失敗したかを示します。 FIRST_ERROR_MESSAGE
列は、試行が部分的にロードされたか失敗した場合の理由を示します。
Kafkaコネクタは、ロードできなかったファイルをターゲットテーブルに関連付けられたステージに移動します。テーブルステージを参照するための構文は、 @[namespace.]%table_name
です。
LIST を使用して、テーブルステージにあるすべてのファイルを一覧表示します。
例:
LIST @mydb.public.%mytable;
ファイル名は次のいずれかの形式です。各フォーマットを生成する条件を表に示します。
ファイルタイプ |
説明 |
---|---|
生バイト |
これらのファイルは、次のパターンに一致します。
これらのファイルの場合、Kafkaの記録は、生バイトからソースファイル形式(Avro、 JSON、またはProtobuf)に変換できませんでした。 この問題の一般的な原因は、ネットワーク障害が原因で文字が記録から削除されることです。Kafkaコネクタは生バイトを解析できなくなったため、記録が壊れました。 |
ソースファイル形式(Avro、 JSON、またはProtobuf) |
これらのファイルは、次のパターンに一致します。
これらのファイルの場合、Kafkaコネクタが生バイトをソースファイル形式に変換し直した後にSnowpipeでエラーが発生し、ファイルをロードできませんでした。 |
次のセクションでは、各ファイルタイプの問題を解決するための手順について説明しています。
生バイト¶
ファイル名 <コネクタ名>/<テーブル名>/<パーティション>/offset_(<キー>/<値>_)<タイムスタンプ>.gz
には、生バイトからソースファイル形式に変換されなかった記録の正確なオフセットが含まれます。問題を解決するには、記録を新しい記録としてKafkaコネクタに再送信します。
ソースファイル形式(Avro、 JSON、またはProtobuf)¶
Snowpipeが、Kafkaトピック用に作成された内部ステージのファイルからデータをロードできなかった場合、ソースファイル形式にあるターゲットテーブルのために、Kafkaコネクタはファイルをステージに移動します。
ファイルのセットに複数の問題がある場合、 COPY_HISTORY 出力の FIRST_ERROR_MESSAGE
列は最初に発生したエラーのみを示します。ファイル内のすべてのエラーを表示するには、テーブルステージからファイルを取得し、それらを名前付きステージにアップロードしてから、 VALIDATION_MODE コピーオプションを RETURN_ALL_ERRORS
に設定して COPY INTO <テーブル> ステートメントを実行する必要があります。VALIDATION_MODE コピーオプションは、ロードされるデータを検証し、指定された検証オプションに基づいて結果を返すように COPY ステートメントに指示します。このコピーオプションが指定されている場合、データはロードされません。ステートメントで、Kafkaコネクタを使用してロードしようとしたファイルのセットを参照します。
データファイルの問題が解決したら、1つ以上の COPY ステートメントを使用してデータを手動でロードできます。
次の例は、 mydb.public
データベースとスキーマの mytable
テーブルのテーブルステージにあるデータファイルを参照します。
テーブルステージでデータファイルを検証し、エラーを解決するには、
LIST を使用して、テーブルステージにあるすべてのファイルを一覧表示します。
例:
LIST @mydb.public.%mytable;
このセクションの例では、 JSON がデータファイルのソース形式であると想定しています。
GET を使用して、Kafkaコネクタによって作成されたファイルをローカルマシンにダウンロードします。
たとえば、ローカルマシンの
data
という名前のディレクトリにファイルをダウンロードします。- Linuxまたは macOS:
GET @mydb.public.%mytable file:///data/;
- Microsoft Windows:
GET @mydb.public.%mytable file://C:\data\;
CREATE STAGE を使用して、ソースKafkaファイルと同じ形式のデータファイルを保存する名前付き内部ステージを作成します。
たとえば、 JSON ファイルを保存する
kafka_json
という名前の内部ステージを作成します。CREATE STAGE kafka_json FILE_FORMAT = (TYPE = JSON);
PUT を使用して、テーブルステージからダウンロードしたファイルをアップロードします。
たとえば、ローカルマシンの
data
ディレクトリにダウンロードしたファイルをアップロードします。- Linuxまたは macOS:
PUT file:///data/ @mydb.public.kafka_json;
- Microsoft Windows:
PUT file://C:\data\ @mydb.public.kafka_json;
テスト目的で、2つのバリアント列を持つ仮テーブルを作成します。このテーブルは、ステージングされたデータファイルを検証する目的のみに使用されます。データはテーブルにロードされません。現在のユーザーセッションが終了すると、テーブルは自動的にドロップされます。
CREATE TEMPORARY TABLE t1 (col1 variant);
COPY INTO *テーブル* ... VALIDATION_MODE = 'RETURN_ALL_ERRORS' ステートメントを実行して、データファイルで発生したすべてのエラーを取得します。このステートメントは、指定されたステージでファイルを検証します。データはテーブルにロードされません。
COPY INTO mydb.public.t1 FROM @mydb.public.kafka_json FILE_FORMAT = (TYPE = JSON) VALIDATION_MODE = 'RETURN_ALL_ERRORS';
ローカルマシンのデータファイルで報告されたすべてのエラーを修正します。
PUT を使用して、固定ファイルをテーブルステージまたは指定された内部ステージのいずれかにアップロードします。
次の例では、ファイルをテーブルステージにアップロードし、既存のファイルを上書きします。
- Linuxまたは macOS:
PUT file:///tmp/myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
- Windows:
PUT file://C:\temp\myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
VALIDATION_MODE オプション なし で COPY INTO テーブル を使用し、データをターゲットテーブルにロードします。
オプションで PURGE = TRUE コピーオプションを使用して、データが正常にロードされたらステージからデータファイルを削除するか、 REMOVE を使用して、テーブルステージから手動でファイルを削除します。
COPY INTO mydb.public.mytable(RECORD_METADATA, RECORD_CONTENT) FROM (SELECT $1:meta, $1:content FROM @mydb.public.%mytable) FILE_FORMAT = (TYPE = 'JSON') PURGE = TRUE;
ステップ2: Kafkaコネクタのログファイルを分析する¶
COPY_HISTORY ビューにデータロードの記録がない場合は、Kafkaコネクタのログファイルを分析します。コネクタは、イベントをログファイルに書き込みます。Snowflake Kafkaコネクタは、すべてのKafkaコネクタプラグインと同じログファイルを共有することに注意してください。このログファイルの名前と場所は、Kafkaコネクタの構成ファイルにある必要があります。詳細については、Apache Kafkaソフトウェア用に提供されているドキュメントをご参照ください。
Snowflake関連のエラーメッセージについては、Kafkaコネクタのログファイルを検索してください。メッセージの多くには、文字列 ERROR
が含まれ、これらのメッセージを見つけやすくするために、ファイル名 com.snowflake.kafka.connector...
が含まれます。
発生する可能性のあるエラーには次があります。
- 構成エラー:
考えられるエラーの原因は次のとおりです。
コネクタには、トピックをサブスクライブするための適切な情報がありません。
コネクタには、Snowflakeテーブルに書き込むための適切な情報がありません(例:認証のキーペアが間違っている可能性があります)。
Kafkaコネクタはそのパラメーターを検証することに注意してください。コネクタは、互換性のない構成パラメーターごとにエラーをスローします。エラーメッセージは、Kafka Connectクラスタのログファイルに書き込まれます。構成の問題が疑われる場合は、そのログファイルのエラーを確認してください。
- 読み取りエラー:
コネクタは、次の理由でKafkaから読み取ることができなかった可能性があります。
KafkaまたはKafka Connectが実行されていない可能性があります。
メッセージはまだ送信されていない可能性があります。
メッセージは削除された可能性があります(期限切れ)。
- 書き込みエラー(ステージ):
考えられるエラーの原因は次のとおりです。
ステージでの権限が不十分です。
ステージのスペースが不足しています。
ステージがドロップされています。
他のユーザーまたはプロセスによるステージへの予期しないファイルの書き込みがあります。
- 書き込みエラー(テーブル):
考えられるエラーの原因は次のとおりです。
テーブルに対する権限が不十分です。
ステップ3:Kafka Connectを確認する¶
Kafka接続ログファイルにエラーが報告されていない場合は、Kafka Connectを確認してください。トラブルシューティングの手順については、Apache Kafkaソフトウェアベンダーが提供するドキュメントをご参照ください。
特定の問題の解決¶
同じトピックパーティションとオフセットにより重複する行¶
バージョン1.4のKafkaコネクタ(またはそれ以上)を使用してデータをロードする場合、同じトピックパーティションとオフセットを持つターゲットテーブルの行が重複していると、ロード操作がデフォルトの実行タイムアウト300000ミリ秒(300秒)を超えたことを示している可能性があります。原因を確認するには、Kafka Connectログファイルで次のエラーを確認します。
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
エラーを解決するには、Kafka設定ファイル(例: <kafka_dir>/config/connect-distributed.properties
)で、以下のプロパティの いずれか を変更します。
consumer.max.poll.interval.ms
実行タイムアウトを
900000
(900秒)に増やす。consumer.max.poll.records
各操作で読み込まれる記録数を
50
に減らす。
問題の報告¶
Snowflakeサポート にサポートを依頼する場合は、次のファイルを準備してください。
Kafkaコネクタの構成ファイル。
重要
ファイルをSnowflakeに提供する前に、秘密キーを削除してください。
Kafkaコネクタのログのコピー。ファイルに部外秘情報や機密情報が 含まれていない ことを確認してください。
JDBC ログファイル。
ログファイルを生成するには、Kafkaコネクタを実行する前に、KafkaConnectクラスターで
JDBC_TRACE = true
環境変数を設定します。JDBC ログファイルの詳細については、Snowflakeコミュニティの この記事 をご参照ください。
ログファイルを接続します。
ログファイルを作成するには、
etc/kafka/connect-log4j.properties
ファイルを編集します。log4j.appender.stdout.layout.ConversionPattern
プロパティを次のように設定します。log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n
コネクタコンテキストは、Kafkaバージョン2.3またはそれ以上で使用できます。
詳細については、Confluentウェブサイトの ログの改善 情報をご参照ください。