Kafkaコネクタのトラブルシューティング

このセクションでは、Kafkaコネクタを使用してデータを取り込む際に発生した問題のトラブルシューティング方法について説明します。

このトピックの内容:

エラー通知

Snowpipeのエラー通知を構成します。読み込み中にSnowpipeでファイルエラーが発生すると、この機能は設定されたクラウドメッセージングサービスに通知をプッシュし、データファイルの分析を可能にします。詳細については、 Snowpipeのエラー通知 をご参照ください。

一般的なトラブルシューティングステップ

Kafkaコネクタを使用したロードの問題をトラブルシューティングするには、次のステップを実行します。

ステップ1: テーブルの COPY 履歴を表示する

ターゲットテーブルのロードアクティビティ履歴をクエリします。詳細については、 COPY_HISTORY ビュー をご参照ください。 COPY_HISTORY 出力に予想されるファイルのセットが含まれていない場合は、以前の期間をクエリします。ファイルが以前のファイルの複製であった場合は、元のファイルをロードしようとしたときに、ロード履歴にアクティビティが記録されていた可能性があります。 STATUS 列は、特定のファイルセットがロードされたか、部分的にロードされたか、またはロードに失敗したかを示します。 FIRST_ERROR_MESSAGE 列は、試行が部分的にロードされたか失敗した場合の理由を示します。

Kafkaコネクタは、ロードできなかったファイルをターゲットテーブルに関連付けられたステージに移動します。テーブルステージを参照するための構文は、 @[namespace.]%table_name です。

LIST を使用して、テーブルステージにあるすべてのファイルを一覧表示します。

例:

LIST @mydb.public.%mytable;
Copy

ファイル名は次のいずれかの形式です。各フォーマットを生成する条件を表に示します。

ファイルタイプ

説明

生バイト

これらのファイルは、次のパターンに一致します。

<コネクタ名>/<テーブル名>/<パーティション>/offset_(<キー>/<値>_)<タイムスタンプ>.gz

これらのファイルの場合、Kafkaの記録は、生バイトからソースファイル形式(Avro、 JSON、またはProtobuf)に変換できませんでした。

この問題の一般的な原因は、ネットワーク障害が原因で文字がレコードから削除されることです。Kafkaコネクタは生バイトを解析できなくなったため、レコードが壊れました。

ソースファイル形式(Avro、 JSON、またはProtobuf)

これらのファイルは、次のパターンに一致します。

<コネクタ名>/<テーブル名>/<パーティション>/<開始オフセット>_<終了オフセット>_<タイムスタンプ>.<ファイルタイプ>.gz

これらのファイルの場合、Kafkaコネクタが生バイトをソースファイル形式に変換し直した後にSnowpipeでエラーが発生し、ファイルをロードできませんでした。

次のセクションでは、各ファイルタイプの問題を解決するための手順について説明しています。

生バイト

ファイル名 <コネクタ名>/<テーブル名>/<パーティション>/offset_(<キー>/<値>_)<タイムスタンプ>.gz には、生バイトからソースファイル形式に変換されなかったレコードの正確なオフセットが含まれます。問題を解決するには、レコードを新しいレコードとしてKafkaコネクタに再送信します。

ソースファイル形式(Avro、 JSON、またはProtobuf)

Snowpipeが、Kafkaトピック用に作成された内部ステージのファイルからデータをロードできなかった場合、ソースファイル形式にあるターゲットテーブルのために、Kafkaコネクタはファイルをステージに移動します。

ファイルのセットに複数の問題がある場合、 COPY_HISTORY 出力の FIRST_ERROR_MESSAGE 列は最初に発生したエラーのみを示します。ファイル内のすべてのエラーを表示するには、テーブルステージからファイルを取得し、それらを名前付きステージにアップロードしてから、 VALIDATION_MODE コピーオプションを RETURN_ALL_ERRORS に設定して COPY INTO <テーブル> ステートメントを実行する必要があります。VALIDATION_MODE コピーオプションは、ロードされるデータを検証し、指定された検証オプションに基づいて結果を返すように COPY ステートメントに指示します。このコピーオプションが指定されている場合、データはロードされません。ステートメントで、Kafkaコネクタを使用してロードしようとしたファイルのセットを参照します。

データファイルの問題が解決したら、1つ以上の COPY ステートメントを使用してデータを手動でロードできます。

次の例は、 mydb.public データベースとスキーマの mytable テーブルのテーブルステージにあるデータファイルを参照します。

テーブルステージでデータファイルを検証し、エラーを解決するには、

  1. LIST を使用して、テーブルステージにあるすべてのファイルを一覧表示します。

    例:

    LIST @mydb.public.%mytable;
    
    Copy

    このセクションの例では、 JSON がデータファイルのソース形式であると想定しています。

  2. GET を使用して、Kafkaコネクタによって作成されたファイルをローカルマシンにダウンロードします。

    たとえば、ローカルマシンの data という名前のディレクトリにファイルをダウンロードします。

    Linuxまたは macOS:
    GET @mydb.public.%mytable file:///data/;
    
    Copy
    Microsoft Windows:
    GET @mydb.public.%mytable file://C:\data\;
    
    Copy
  3. CREATE STAGE を使用して、ソースKafkaファイルと同じ形式のデータファイルを保存する名前付き内部ステージを作成します。

    たとえば、 JSON ファイルを保存する kafka_json という名前の内部ステージを作成します。

    CREATE STAGE kafka_json FILE_FORMAT = (TYPE = JSON);
    
    Copy
  4. PUT を使用して、テーブルステージからダウンロードしたファイルをアップロードします。

    たとえば、ローカルマシンの data ディレクトリにダウンロードしたファイルをアップロードします。

    Linuxまたは macOS:
    PUT file:///data/ @mydb.public.kafka_json;
    
    Copy
    Microsoft Windows:
    PUT file://C:\data\ @mydb.public.kafka_json;
    
    Copy
  5. テスト目的で、2つのバリアント列を持つ仮テーブルを作成します。このテーブルは、ステージングされたデータファイルを検証する目的のみに使用されます。データはテーブルにロードされません。現在のユーザーセッションが終了すると、テーブルは自動的にドロップされます。

    CREATE TEMPORARY TABLE t1 (col1 variant);
    
    Copy
  6. COPY INTO *テーブル* ... VALIDATION_MODE = 'RETURN_ALL_ERRORS' ステートメントを実行して、データファイルで発生したすべてのエラーを取得します。このステートメントは、指定されたステージでファイルを検証します。データはテーブルにロードされません。

    COPY INTO mydb.public.t1
      FROM @mydb.public.kafka_json
      FILE_FORMAT = (TYPE = JSON)
      VALIDATION_MODE = 'RETURN_ALL_ERRORS';
    
    Copy
  7. ローカルマシンのデータファイルで報告されたすべてのエラーを修正します。

  8. PUT を使用して、固定ファイルをテーブルステージまたは指定された内部ステージのいずれかにアップロードします。

    次の例では、ファイルをテーブルステージにアップロードし、既存のファイルを上書きします。

    Linuxまたは macOS:
    PUT file:///tmp/myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
    Windows:
    PUT file://C:\temp\myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
  9. VALIDATION_MODE オプション なし で COPY INTO テーブル を使用し、データをターゲットテーブルにロードします。

    オプションで PURGE = TRUE コピーオプションを使用して、データが正常にロードされたらステージからデータファイルを削除するか、 REMOVE を使用して、テーブルステージから手動でファイルを削除します。

    COPY INTO mydb.public.mytable(RECORD_METADATA, RECORD_CONTENT)
      FROM (SELECT $1:meta, $1:content FROM @mydb.public.%mytable)
      FILE_FORMAT = (TYPE = 'JSON')
      PURGE = TRUE;
    
    Copy

ステップ2: Kafkaコネクタのログファイルを分析する

COPY_HISTORY ビューにデータロードの記録がない場合は、Kafkaコネクタのログファイルを分析します。コネクタは、イベントをログファイルに書き込みます。Snowflake Kafkaコネクタは、すべてのKafkaコネクタプラグインと同じログファイルを共有することに注意してください。このログファイルの名前と場所は、Kafkaコネクタの構成ファイルにある必要があります。詳細については、Apache Kafkaソフトウェア用に提供されているドキュメントをご参照ください。

Snowflake関連のエラーメッセージについては、Kafkaコネクタのログファイルを検索してください。メッセージの多くには、文字列 ERROR が含まれ、これらのメッセージを見つけやすくするために、ファイル名 com.snowflake.kafka.connector... が含まれます。

発生する可能性のあるエラーには次があります。

構成エラー:

考えられるエラーの原因は次のとおりです。

  • コネクタには、トピックをサブスクライブするための適切な情報がありません。

  • コネクタには、Snowflakeテーブルに書き込むための適切な情報がありません(例:認証のキーペアが間違っている可能性があります)。

Kafkaコネクタはそのパラメーターを検証することに注意してください。コネクタは、互換性のない構成パラメーターごとにエラーをスローします。エラーメッセージは、Kafka Connectクラスタのログファイルに書き込まれます。構成の問題が疑われる場合は、そのログファイルのエラーを確認してください。

読み取りエラー:

コネクタは、次の理由でKafkaから読み取ることができなかった可能性があります。

  • KafkaまたはKafka Connectが実行されていない可能性があります。

  • メッセージはまだ送信されていない可能性があります。

  • メッセージは削除された可能性があります(期限切れ)。

書き込みエラー(ステージ):

考えられるエラーの原因は次のとおりです。

  • ステージでの権限が不十分です。

  • ステージのスペースが不足しています。

  • ステージがドロップされています。

  • 他のユーザーまたはプロセスによるステージへの予期しないファイルの書き込みがあります。

書き込みエラー(テーブル):

考えられるエラーの原因は次のとおりです。

  • テーブルに対する権限が不十分です。

ステップ3:Kafka Connectを確認する

Kafka接続ログファイルにエラーが報告されていない場合は、Kafka Connectを確認してください。トラブルシューティングの手順については、Apache Kafkaソフトウェアベンダーが提供するドキュメントをご参照ください。

特定の問題の解決

同じトピックパーティションとオフセットにより重複する行

バージョン1.4のKafkaコネクタ(またはそれ以上)を使用してデータをロードする場合、同じトピックパーティションとオフセットを持つターゲットテーブルの行が重複していると、ロード操作がデフォルトの実行タイムアウト300000ミリ秒(300秒)を超えたことを示している可能性があります。原因を確認するには、Kafka Connectログファイルで次のエラーを確認します。

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.

This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
Copy

エラーを解決するには、Kafka設定ファイル(例: <kafka_dir>/config/connect-distributed.properties)で、以下のプロパティの いずれか を変更します。

consumer.max.poll.interval.ms

実行タイムアウトを 900000 (900秒)に増やす。

consumer.max.poll.records

各操作で読み込まれるレコード数を 50 に減らす。

ストリーミングチャンネルオフセットの移行に失敗しました: 5023

v2.1.0(またはそれ以上)のコネクタバージョンにアップグレードする際、Snowpipeストリーミングチャンネルの名前フォーマットに変更がありました。その結果、以前にコミットされたオフセットに関する情報を検出するロジックは、以前にコミットされたオフセットに関する情報を見つけることができません。これは次のような例外として現れます。

com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException: [SF_KAFKA_CONNECTOR] Exception: Failure in Streaming Channel Offset Migration Response Error Code: 5023

Detail: Streaming Channel Offset Migration from Source to Destination Channel has no/invalid response, please contact Snowflake Support

Message: Snowflake experienced a transient exception, please retry the migration request.
Copy

このエラーを解決するには、Kafka設定ファイル(たとえば、 <kafka_dir >/config/connect-distributed.properties)に、次の設定プロパティを追加します。

enable.streaming.channel.offset.migration

自動オフセット移動を無効にするには、 false に設定します。

複数のトピックをサポートするコネクタの設定

1つのkafkaコネクタインスタンスで、それぞれが複数のパーティションを持つ多数のトピックをサポートする問題が発生しました。コネクタの構成は有効であるように見えましたが、Snowflakeにデータを取り込むことができないまま、再バランスサイクルが延々と続きました。この問題はSnowpipe Streamingインジェスチョンモード(snowflake.ingestion.method=SNOWPIPE_STREAMING)特有のものでしたが、ガイドラインはSnowpipeインジェスチョンモード(snowflake.ingestion.method=SNOWPIPE)にも適用できます。この問題は、このログメッセージを繰り返しログファイルに記録することで顕在化します。

[Worker-xyz] [timestamp] INFO [my-connector|task-id] [SF_INGEST] Channel is marked as closed

これは通常、正規表現でトピックを取り込むようにコネクタを構成した場合に発生します。Kafka設定ファイルには、以下のオプションセットを適用することをお勧めします(例: <kafka_dir >/config/connect-distributed.properties)。

consumer.override.partition.assignment.strategy

タスクへのパーティション割り当て戦略を org.apache.kafka.clients.consumer.CooperativeStickyAssignor として設定します。これにより、インジェストされたチャンネルが利用可能なタスクに均等に分配され、再バランスのリスクが軽減されます。

tasks.max

コネクタごとにインスタンス化されたタスクの数は、利用可能な CPU の数を超えてはなりません。基礎となるドライバーは、利用可能な CPU に基づいてスロットリングメカニズムを実装します。同時リクエストの数が増えると、システムのメモリへの負担が増えるだけでなく、インサートの処理時間が長くなり、コネクタのハートビートの欠落に直結します。

コネクタのタイムアウト値について言えば、これらに直接影響する一連の設定プロパティがあります。

consumer.override.heartbeat.interval.ms

監視スレッド(各タスクに1つずつ関連付けられます)がKafkaにハートビートを送信する頻度を定義します。デフォルトは 3000 msですが、システム負荷が高い場合は、 5000 msに増やすこともできます。

consumer.override.session.timeout.ms

コンシューマーが無効な状態であると仮定し、再バランスを試みるまでのブローカの待機時間を定義します。この設定は通常、ハートビート間隔の3倍以上でなければなりません。ハートビートを 5000 msに設定した場合、この設定は 15000 msに設定します。

consumer.override.max.poll.interval.ms

基盤となるKafkaから poll() を呼び出す最大間隔を定義します。ポーリングとポーリングの間に費やされる時間は、基本的にコネクタがデータのバッチを処理する時間(Snowflakeへのアップロードとコミットを含む)に相当します。複数のタスクがデータを処理するシナリオでは、基礎となるSnowflake Connectionがリクエストのスロットルを開始し、処理時間が長くなる可能性があります。シナリオによっては、この値を20分(1200000 ms)に増やすこともできます。特に、取り込むべき初期記録数が多い状態でコネクタを開始する場合です。

consumer.override.rebalance.timeout.ms

再バランスが発生した場合、タスクごとに多数のチャンネルがあるシナリオでは、処理を再開する場所を特定するために、チャンネルごとに多くの基本ロジックが存在します。このコードは順次実行されるため、1タスクあたりのチャンネル数が多ければ多いほど、初期セットアップに時間がかかります。このプロパティは、各チャンネルが初期化を完了するのに十分な大きさの値に設定します。3分(180000 ms)という値は、良い出発点です。

また、コネクタで使用可能なヒープメモリーに注意することも重要です。これは、複数のコネクタが同時に実行されたり、1つのコネクタが複数のトピックからデータを取り込んだりするシナリオで特に重要です。各トピックのパーティションは1つのチャンネルにマッピングされるため、メモリが必要となります。

Xmx設定でKafka接続プロセスのメモリ設定を調整してください。その1つの方法は、 KAFKA_OPTS 環境変数を定義し、それに従って設定することです(つまり、 KAFKA_OPTS=-Xmx4G)。

ファイルクリーナーでファイルが予期せずパージされる

Kafkaコネクタを SNOWPIPE で使用する場合、複数のトピックから単一のテーブルにデータを取り込む問題が発生する可能性があります。コンフィギュレーションに snowflake.topic2table.map エントリーがないか、トピックとテーブルの間に1:1のマッピングがある場合、この問題は適用されません。

Kafkaコネクタは、ステージにアップロードする記録のファイルを生成しています。これらのファイルは、次のパターンに従ってフォーマットされます。 snowflake_kafka_connector_<コネクター名>_stage_<テーブル名>/<コネクタ名>/<テーブル名>/<パーティションID>/<低ウォーターマーク>_<高ウォーターマーク>_<タイムスタンプ>.json.gz。この問題は、 <パーティションID> にあります。複数のトピックが1つのテーブルにデータをロードする場合、 partition-id の値で重複が発生する可能性があります。これは通常のコネクタ操作では問題ありません。しかし、コネクタが再起動またはリバランスすると、クリーナープロセスがステージにロードされた(まだインジェストされていない)ファイルを間違ったパーティションに不正確に関連付け、削除を決定する可能性があり、データ損失イベントが発生する可能性があります。

バージョン2.4.xのコネクタでは、 partition-id にソーストピックのハッシュコードを含めることにより、1つのトピックのパーティションに正確に一致する一意のファイル名を確保することで、この問題を修正しています。この修正はデフォルト(snowflake.snowpipe.stageFileNameExtensionEnabled)で有効になっており、 snowflake.topic2table.map でターゲットテーブルが複数回リストされている設定にのみ影響します。

設定がこの機能の影響を受けると、ステージにアップロードされたファイルが古くなってしまうかもしれません。コネクタが起動すると、ステージにそのようなファイルがあるかどうかをチェックします。検出されたファイルのリストに続いて、 NOTE: For table で始まるログエントリーを探す必要があります。

また、そのステージで影響を受けるファイルがあるかどうかを手動で確認することもできます。

  1. 影響を受けたステージを見つける

    show stages like 'snowflake_kafka_connector%<your table name>';
    
    Copy
  2. ステージファイルをリストする

    list @<your stage name> pattern = '.+/<your-table-name>/[0-9]{1,4}/[0-9]+_[0-9]+_[0-9]+\.json\.gz$';
    
    Copy

上記のコマンドは、テーブルのステージに一致し、パーティション IDs が0~9999の範囲にあるファイルをすべてリストアップします。これらのファイルはもうインジェストされないので、ダウンロードするか削除してください。

問題の報告

Snowflakeサポート にサポートを依頼する場合は、次のファイルを準備してください。

  • Kafkaコネクタの構成ファイル。

    重要

    ファイルをSnowflakeに提供する前に、秘密キーを削除してください。

  • Kafkaコネクタのログのコピー。ファイルに部外秘情報や機密情報が 含まれていない ことを確認してください。

  • JDBC ログファイル。

    ログファイルを生成するには、Kafkaコネクタを実行する前に、KafkaConnectクラスターで JDBC_TRACE = true 環境変数を設定します。

    JDBC ログファイルの詳細については、Snowflakeコミュニティの この記事 をご参照ください。

  • ログファイルを接続します。

    ログファイルを作成するには、 etc/kafka/connect-log4j.properties ファイルを編集します。 log4j.appender.stdout.layout.ConversionPattern プロパティを次のように設定します。

    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n

    コネクタコンテキストは、Kafkaバージョン2.3またはそれ以上で使用できます。

    詳細については、Confluentウェブサイトの ログの改善 情報をご参照ください。