Kafkaコネクタのトラブルシューティング

このセクションでは、Kafkaコネクタを使用してデータを取り込む際に発生した問題のトラブルシューティング方法について説明します。

このトピックの内容:

問題のトラブルシューティング

このセクションでは、Kafkaコネクタを使用して負荷の問題をトラブルシューティングするための体系的なアプローチについて説明します。

ステップ1:テーブルの COPY 履歴を表示する

ターゲットテーブルのロードアクティビティ履歴をクエリします。詳細については、 COPY_HISTORY ビュー をご参照ください。 COPY_HISTORY 出力に予想されるファイルのセットが含まれていない場合は、以前の期間をクエリします。ファイルが以前のファイルの複製であった場合は、元のファイルをロードしようとしたときに、ロード履歴にアクティビティが記録されていた可能性があります。 STATUS 列は、特定のファイルセットがロードされたか、部分的にロードされたか、またはロードに失敗したかを示します。 FIRST_ERROR_MESSAGE 列は、試行が部分的にロードされたか失敗した場合の理由を示します。

Kafkaコネクタは、ロードできなかったファイルをターゲットテーブルに関連付けられたステージに移動します。テーブルステージを参照するための構文は、 @[名前空間.]%テーブル名 です。

LIST を使用して、テーブルステージにあるすべてのファイルを一覧表示します。

例:

LIST @mydb.public.%mytable;

ファイル名は次のいずれかの形式です。各フォーマットを生成する条件を表に示します。

ファイルタイプ

ファイル形式

説明

生バイト

<コネクタ名>/<テーブル名>/<パーティション>/offset_(<キー>/<値>_)<タイムスタンプ>.gz

Kafkaレコードは、生バイトからソースファイル形式(Avro、 JSON、またはProtobuf)に変換できませんでした。この問題の一般的な原因は、ネットワーク障害が原因で文字がレコードから削除されることです。Kafkaコネクタは生バイトを解析できなくなったため、レコードが壊れました。

ソースファイル形式(Avro、 JSON、またはProtobuf)

<コネクタ名>/<テーブル名>/<パーティション>/<開始オフセット>_<終了オフセット>_<タイムスタンプ>.<ファイルタイプ>.gz

Kafkaコネクタが生バイトをソースファイル形式に変換し直した後、Snowpipeでエラーが発生し、ファイルをロードできませんでした。

次のセクションでは、各ファイルタイプの問題を解決するための手順について説明しています。

生バイト

ファイル名 <コネクタ名>/<テーブル名>/<パーティション>/offset_(<キー>/<値>_)<タイムスタンプ>.gz には、生バイトからソースファイル形式に変換されなかったレコードの正確なオフセットが含まれます。問題を解決するには、レコードを新しいレコードとしてKafkaコネクタに再送信します。

ソースファイル形式(Avro、 JSON、またはProtobuf)

Snowpipeが、Kafkaトピック用に作成された内部ステージのファイルからデータをロードできなかった場合、ソースファイル形式にあるターゲットテーブルのために、Kafkaコネクタはファイルをステージに移動します。

ファイルのセットに複数の問題がある場合、 COPY_HISTORY 出力の FIRST_ERROR_MESSAGE 列は最初に発生したエラーのみを示します。ファイル内のすべてのエラーを表示するには、テーブルステージからファイルを取得して名前付きステージにアップロードし、 VALIDATION_MODE コピーオプションを RETURN_ALL_ERRORS に設定して COPY INTO <テーブル> ステートメントを実行する必要があります。VALIDATION_MODE コピーオプションは、ロードされるデータを検証し、指定された検証オプションに基づいて結果を返すように COPY ステートメントに指示します。このコピーオプションが指定されている場合、データはロードされません。ステートメントで、Kafkaコネクタを使用してロードしようとしたファイルのセットを参照します。

データファイルの問題が解決したら、1つ以上の COPY ステートメントを使用してデータを手動でロードできます。

次の例は、 mydb.public データベースとスキーマの mytable テーブルのテーブルステージにあるデータファイルを参照します。

テーブルステージでデータファイルを検証し、エラーを解決するには、

  1. LIST を使用して、テーブルステージにあるすべてのファイルを一覧表示します。

    例:

    LIST @mydb.public.%mytable;
    

    このセクションの例では、 JSON がデータファイルのソース形式であると想定しています。

  2. GET を使用して、Kafkaコネクタによって作成されたファイルをローカルマシンにダウンロードします。

    たとえば、ローカルマシンの data という名前のディレクトリにファイルをダウンロードします。

    Linuxまたは macOS
    GET @mydb.public.%mytable file:///data/;
    
    Microsoft Windows
    GET @mydb.public.%mytable file://C:\data\;
    
  3. CREATE STAGE を使用して、ソースKafkaファイルと同じ形式のデータファイルを保存する名前付き内部ステージを作成します。

    たとえば、 JSON ファイルを保存する kafka_json という名前の内部ステージを作成します。

    CREATE STAGE kafka_json FILE_FORMAT = (TYPE = JSON);
    
  4. PUT を使用して、テーブルステージからダウンロードしたファイルをアップロードします。

    たとえば、ローカルマシンの data ディレクトリにダウンロードしたファイルをアップロードします。

    Linuxまたは macOS
    PUT file:///data/ @mydb.public.kafka_json;
    
    Microsoft Windows
    PUT file://C:\data\ @mydb.public.kafka_json;
    
  5. テスト目的で、2つのバリアント列を持つ仮テーブルを作成します。このテーブルは、ステージングされたデータファイルを検証する目的のみに使用されます。データはテーブルにロードされません。現在のユーザーセッションが終了すると、テーブルは自動的にドロップされます。

    CREATE TEMPORARY TABLE t1 (col1 variant, col2 variant);
    
  6. COPY INTO *テーブル* ... VALIDATION_MODE = 'RETURN_ALL_ERRORS' ステートメントを実行して、データファイルで発生したすべてのエラーを取得します。このステートメントは、指定されたステージでファイルを検証します。データはテーブルにロードされません。

    COPY INTO mydb.public.t1
      FROM @mydb.public.kafka_json
      FILE_FORMAT = (TYPE = JSON)
      VALIDATION_MODE = 'RETURN_ALL_ERRORS';
    
  7. ローカルマシンのデータファイルで報告されたすべてのエラーを修正します。

  8. PUT を使用して、固定ファイルをテーブルステージまたは指定された内部ステージのいずれかにアップロードします。

    次の例では、ファイルをテーブルステージにアップロードし、既存のファイルを上書きします。

    Linuxまたは macOS
    PUT file:///tmp/myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Windows
    PUT file://C:\temp\myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
  9. VALIDATION_MODE オプション なし で COPY INTO テーブル を使用し、データをターゲットテーブルにロードします。

    オプションで PURGE = TRUE コピーオプションを使用して、データが正常にロードされたらステージからデータファイルを削除するか、 REMOVE を使用して、テーブルステージから手動でファイルを削除します。

    COPY INTO mydb.public.mytable
      FROM @mydb.public.%mytable PURGE = TRUE;
    

ステップ2:Kafkaコネクタのログファイルを分析する

COPY_HISTORY ビューにデータロードの記録がない場合は、Kafkaコネクタのログファイルを分析します。コネクタは、イベントをログファイルに書き込みます。Snowflake Kafkaコネクタは、すべてのKafkaコネクタプラグインと同じログファイルを共有することに注意してください。このログファイルの名前と場所は、Kafkaコネクタの構成ファイルにある必要があります。詳細については、Apache Kafkaソフトウェア用に提供されているドキュメントをご参照ください。

Snowflake関連のエラーメッセージについては、Kafkaコネクタのログファイルを検索してください。メッセージの多くには、文字列 ERROR が含まれ、これらのメッセージを見つけやすくするために、ファイル名 com.snowflake.kafka.connector... が含まれます。

発生する可能性のあるエラーには次があります。

構成エラー

考えられるエラーの原因は次のとおりです。

  • コネクタには、トピックをサブスクライブするための適切な情報がありません。

  • コネクタには、Snowflakeテーブルに書き込むための適切な情報がありません(例:認証のキーペアが間違っている可能性があります)。

Kafkaコネクタはそのパラメーターを検証することに注意してください。コネクタは、互換性のない構成パラメーターごとにエラーをスローします。エラーメッセージは、Kafka Connectクラスタのログファイルに書き込まれます。構成の問題が疑われる場合は、そのログファイルのエラーを確認してください。

読み取りエラー

コネクタは、次の理由でKafkaから読み取ることができなかった可能性があります。

  • KafkaまたはKafka Connectが実行されていない可能性があります。

  • メッセージはまだ送信されていない可能性があります。

  • メッセージは削除された可能性があります(期限切れ)。

書き込みエラー(ステージ)

考えられるエラーの原因は次のとおりです。

  • ステージでの権限が不十分です。

  • ステージのスペースが不足しています。

  • ステージがドロップされています。

  • 他のユーザーまたはプロセスによるステージへの予期しないファイルの書き込みがあります。

書き込みエラー(テーブル)

考えられるエラーの原因は次のとおりです。

  • テーブルに対する権限が不十分です。

ステップ3:Kafka Connectを確認する

Kafka接続ログファイルにエラーが報告されていない場合は、Kafka Connectを確認してください。トラブルシューティングの手順については、Apache Kafkaソフトウェアベンダーが提供するドキュメントをご参照ください。

特定の問題の解決

同じトピックパーティションとオフセットにより重複する行

バージョン1.4のKafkaコネクタ(またはそれ以上)を使用してデータをロードする場合、同じトピックパーティションとオフセットを持つターゲットテーブルの行が重複していると、ロード操作がデフォルトの実行タイムアウト300000ミリ秒(300秒)を超えたことを示している可能性があります。原因を確認するには、Kafka Connectログファイルで次のエラーを確認します。

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.

This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)

エラーを解決するには、Kafka設定ファイル(例: <kafka_dir>/config/connect-distributed.properties)で、以下のプロパティの いずれか を変更します。

consumer.max.poll.interval.ms

実行タイムアウトを 900000 (900秒)に増やす。

consumer.max.poll.records

各操作で読み込まれるレコード数を 50 に減らす。

問題の報告

Snowflakeサポート にサポートを依頼する場合は、次のファイルを準備してください。

  • Kafkaコネクタの構成ファイル。

    重要

    ファイルをSnowflakeに提供する前に、秘密キーを削除してください。

  • Kafkaコネクタのログのコピー。ファイルに部外秘情報や機密情報が 含まれていない ことを確認してください。