Kafkaコネクタのトラブルシューティング

このセクションでは、Kafkaコネクタを使用してデータを取り込む際に発生した問題のトラブルシューティング方法について説明します。

このトピックの内容:

問題のトラブルシューティング

このセクションでは、Kafkaコネクタを使用して負荷の問題をトラブルシューティングするための体系的なアプローチについて説明します。

ステップ1:テーブルの COPY 履歴を表示する

ターゲットテーブルのロードアクティビティ履歴をクエリします。詳細については、 COPY_HISTORY ビュー をご参照ください。 COPY_HISTORY 出力に予想されるファイルのセットが含まれていない場合は、以前の期間をクエリします。ファイルが以前のファイルの複製であった場合は、元のファイルをロードしようとしたときに、ロード履歴にアクティビティが記録されていた可能性があります。

STATUS 列は、特定のファイルセットがロードされたか、部分的にロードされたか、またはロードに失敗したかを示します。 FIRST_ERROR_MESSAGE 列は、試行が部分的にロードされたか失敗した場合の理由を示します。

データファイルの検証とエラーの解決

SnowpipeがKafkaトピック用に作成された内部ステージのファイルからデータをロードできなかった場合、Kafkaコネクタはファイルをターゲットテーブルに関連付けられた特別なステージに移動します。テーブルステージを参照するための構文は、 @[名前空間.]%テーブル名 です。

ファイルのセットに複数の問題がある場合、 COPY_HISTORY 出力の FIRST_ERROR_MESSAGE 列は最初に発生したエラーのみを示します。ファイル内のすべてのエラーを表示するには、 VALIDATION_MODE コピーオプションを RETURN_ALL_ERRORS に設定して COPY INTO <テーブル> ステートメントを実行します。VALIDATION_MODE コピーオプションは、ロードされるデータを検証し、指定された検証オプションに基づいて結果を返すように COPY ステートメントに指示します。このコピーオプションが指定されている場合、データはロードされません。ステートメントで、Kafkaコネクタを使用してロードしようとしたファイルのセットを参照します。

データファイルの問題が解決したら、1つ以上の COPY ステートメントを使用してデータを手動でロードできます。

次の例は、 mydb.public データベースとスキーマの mytable テーブルのテーブルステージにあるデータファイルを参照します。

テーブルステージでデータファイルを検証し、エラーを解決するには:

  1. テーブルステージにあるすべてのファイルを一覧表示します( LIST を使用)。

    LIST @mydb.public.%mytable;
    
    +-----------------+------+----------------------------------+-------------------------------+
    | name            | size | md5                              | last_modified                 |
    |-----------------+------+----------------------------------+-------------------------------|
    | myfile.csv.gz   |  512 | a123cdef1234567890abcdefghijk123 | Tue, 22 Oct 2019 14:20:31 GMT |
    +-----------------+------+----------------------------------+-------------------------------+
    
  2. ファイルで発生したすべてのエラーを取得します( COPY INTO テーブル ... VALIDATION_MODE = 'RETURN_ALL_ERRORS' を使用)。

    COPY INTO mydb.public.mytable FROM @mydb.public.%mytable VALIDATION_MODE = 'RETURN_ALL_ERRORS';
    
  3. テーブルステージからローカルディレクトリにデータファイルをダウンロードします( GET を使用)。

    Linux/Mac
    GET @mydb.public.%mytable file:///tmp/;
    
    Windows
    GET @mydb.public.%mytable file://C:\temp\;
    
  4. データファイル内のすべてのエラーを修正します。

  5. Kafkaトピックのテーブルステージまたは名前付き内部ステージのいずれかにファイルをステージングします( PUT を使用)。この例では、ファイルをテーブルステージにステージングし、既存のファイルを上書きします。

    Linux/Mac
    PUT file:///tmp/myfile.csv @mydb.public.%mytable;
    
    Windows
    PUT file://C:\temp\myfile.csv @mydb.public.%mytable;
    
  6. データをターゲットテーブルにロードします( VALIDATION_MODE オプションなしで COPY INTO テーブル を使用)。オプションで PURGE = TRUE コピーオプションを使用して、データが正常にロードされたらステージからデータファイルを削除するか、テーブルステージから手動でファイルを削除できます( REMOVE を使用)。

    COPY INTO mydb.public.mytable FROM @mydb.public.%mytable PURGE = TRUE;
    

ステップ2:Kafkaコネクタのログファイルを分析する

COPY_HISTORY ビューにデータロードの記録がない場合は、Kafkaコネクタのログファイルを分析します。コネクタは、イベントをログファイルに書き込みます。Snowflake Kafkaコネクタは、すべてのKafkaコネクタプラグインと同じログファイルを共有することに注意してください。このログファイルの名前と場所は、Kafkaコネクタの構成ファイルにある必要があります。詳細については、Apache Kafkaソフトウェア用に提供されているドキュメントをご参照ください。

Snowflake関連のエラーメッセージについては、Kafkaコネクタのログファイルを検索してください。メッセージの多くには、文字列 ERROR が含まれ、これらのメッセージを見つけやすくするために、ファイル名 com.snowflake.kafka.connector... が含まれます。

発生する可能性のあるエラーには次があります。

構成エラー

考えられるエラーの原因は次のとおりです。

  • コネクタには、トピックをサブスクライブするための適切な情報がありません。

  • コネクタには、Snowflakeテーブルに書き込むための適切な情報がありません(例:認証のキーペアが間違っている可能性があります)。

Kafkaコネクタはそのパラメーターを検証することに注意してください。コネクタは、互換性のない構成パラメーターごとにエラーをスローします。エラーメッセージは、Kafka Connectクラスタのログファイルに書き込まれます。構成の問題が疑われる場合は、そのログファイルのエラーを確認してください。

読み取りエラー

コネクタは、次の理由でKafkaから読み取ることができなかった可能性があります。

  • KafkaまたはKafka Connectが実行されていない可能性があります。

  • メッセージはまだ送信されていない可能性があります。

  • メッセージは削除された可能性があります(期限切れ)。

書き込みエラー(ステージ)

考えられるエラーの原因は次のとおりです。

  • ステージでの権限が不十分です。

  • ステージのスペースが不足しています。

  • ステージがドロップされています。

  • 他のユーザーまたはプロセスによるステージへの予期しないファイルの書き込みがあります。

書き込みエラー(テーブル)

考えられるエラーの原因は次のとおりです。

  • テーブルに対する権限が不十分です。

ステップ3:Kafka Connectを確認する

Kafka接続ログファイルにエラーが報告されていない場合は、Kafka Connectを確認してください。トラブルシューティングの手順については、Apache Kafkaソフトウェアベンダーが提供するドキュメントをご参照ください。

特定の問題の解決

同じトピックパーティションとオフセットにより重複する行

バージョン1.4のKafkaコネクタ(またはそれ以上)を使用してデータをロードする場合、同じトピックパーティションとオフセットを持つターゲットテーブルの行が重複していると、ロード操作がデフォルトの実行タイムアウト300000ミリ秒(300秒)を超えたことを示している可能性があります。原因を確認するには、Kafka Connectログファイルで次のエラーを確認します。

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.

This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)

エラーを解決するには、Kafka設定ファイル(例: <kafka_dir>/config/connect-distributed.properties)で、以下のプロパティの いずれか を変更します。

consumer.max.poll.interval.ms

実行タイムアウトを 900000 (900秒)に増やす。

consumer.max.poll.records

各操作で読み込まれるレコード数を 50 に減らす。

問題の報告

Snowflakeサポート にサポートを依頼する場合は、次のファイルを準備してください。

  • Kafkaコネクタの構成ファイル。

    重要

    ファイルをSnowflakeに提供する前に、秘密キーを削除してください。

  • Kafkaコネクタのログのコピー。ファイルに部外秘情報や機密情報が 含まれていない ことを確認してください。