Snowflake Connector for Kafka のトラブルシューティング

このトピックでは、 Snowflake Connector for Kafka の一般的な問題のトラブルシューティング方法について説明します。

取り込みエラー

チャネルレポートでrow_error_countが増加している

Snowpipe Streamingチャネルが rows_error_count の増加をレポートする場合、コネクタの動作は errors.tolerance 設定によって異なります。

  • errors.tolerance=none (デフォルト)では、コネクタタスクは ERROR_5030 で失敗します。

  • errors.tolerance=all に設定するとコネクタは動作を継続しますが、エラー数をログに記録します。

注釈

サーバー側の検証と errors.tolerance=none では、エラーは非同期的です。コネクタは次の事前コミットサイクルでエラーを検出するため、タスクが失敗する前に追加のレコードが取り込まれる場合があります。

調査するには:

  1. ターゲットテーブルに関連付けられたエラーテーブルをチェックして、失敗した記録を特定します。詳細については Snowpipe Streaming高性能アーキテクチャでのエラー処理 をご参照ください。

  2. RECORD_METADATA 列のKafkaオフセット情報で、 メタデータオフセットを使用したエラーの検出と復旧 で説明されているギャップ検出テクニックを使用します。

  3. エラーの詳細については、コネクタのログを確認してください(詳細なロギングを実施するために errors.log.enable=true を有効に設定)。

コネクタタスクが ERROR_5030で失敗する

ERROR_5030 は、コネクタがデータの取り込みエラーを検出したことを示します。一般的な原因には次のものがあります。

  • Kafkaレコードとターゲットテーブルスキーマ間のデータ型の不一致。

  • snowflake.validation=client_side が構成されている間はユーザーが作成したパイプが存在します。クライアント側の検証はデフォルトのパイプでのみ機能します。

  • 自動的に進化できないKafka記録のスキーマの変更。

解決するには:

  1. 特定の原因について、エラーメッセージとコネクタログを確認します。

  2. ユーザーが定義したパイプでクライアント側の検証を使用する場合は、 snowflake.validation=server_side に切り替えるか、ユーザーが定義したパイプを削除します。

  3. ソースKafkaトピックのデータを修正するか、ターゲットテーブルスキーマを調整します。

スキーマの進化の問題

サーバー側の検証では、スキーマの進化によって常に正しいデータ型を推測できるとは限りません。たとえば、バイナリ列を推論することはできず、 "2026-04-13" のような文字列を TEXT ではなく DATE として解釈する可能性があります。

スキーマの進化により予期しない列タイプが生成される場合:

  • より良好な型推論を実現するには、クライアント側の検証(snowflake.validation=client_side)を使用します。

  • コネクタを起動する前に、正しいスキーマでテーブルを事前に作成します。

注釈

コネクタはテーブルスキーマのみをキャッシュに保存します。コネクタの実行中にターゲットテーブルに対して同時 DDL 操作を行うと、未定義の動作が発生する可能性があります。コネクタがアクティブに取り込みを行っているテーブルに対して DDL を実行することは回避してください。

接続と認証の問題

認証失敗

v4コネクタは、キーペア認証のみをサポートします。一般的な認証の問題:

  • 無効なプライベートキー:snowflake.private.key 値が有効なBase64エンコードされた PKCS#8秘密キーであることを確認します。

  • キーパスフレーズ:キーが暗号化されている場合は、 snowflake.private.key.passphrase を正しいパスフレーズに設定してください。

  • ロールの権限:snowflake.role.name で指定されたロールに必要な権限が付与されていることを確認します。詳細については Snowflake Connector for Kafka:Snowflakeの構成 をご参照ください。

認可エラー

コネクタにSnowflakeからの認可エラーが発生した場合、動作は enable.task.fail.on.authorization.errors 設定によって異なります。

  • enable.task.fail.on.authorization.errors=false (デフォルト)では、コネクタは再試行します。

  • enable.task.fail.on.authorization.errors=true に設定すると、コネクタタスクは即座に失敗します。

構成に関する問題

スキーマ化でサポートされていないコンバーター

snowflake.enable.schematization=true (デフォルト)では、 StringConverter および ByteArrayConverter は値コンバーターとしてサポートされません。代わりに構造化コンバーターを使用してください。

  • org.apache.kafka.connect.json.JsonConverter

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.protobuf.ProtobufConverter

v3構成プロパティの削除

認識されない構成プロパティに関するエラーが表示された場合は、v4で削除されたプロパティを使用しているかどうかを確認してください。削除された構成の全一覧については、 Migrate from Kafka connector v3 to v4 をご参照ください。

起動時に互換性検証が失敗する

コネクタの起動時に、構成値が欠落している、または互換性のない構成値に関するエラーが発生して失敗した場合は、互換性検証ツール(snowflake.streaming.validate.compatibility.with.classic)がv3移行要件に基づいて構成をチェックします。

  • 新規インストールの場合:snowflake.streaming.validate.compatibility.with.classic=false を設定してチェックをスキップします。

  • v3からの移行の場合:すべての必要な互換プロパティを明示的に設定します。全一覧については、 snowflake.streaming.validate.compatibility.with.classic をご参照ください。

パフォーマンス問題

取り込みラグが拡大中

latest-consumer-offset マイナス persisted-in-snowflake-offset のギャップが増加している(JMX メトリック によって表示可能)の場合は、コネクタの稼働速度が低下しています。

解決するには:

  • タスクを増やす:tasks.max をKafkaパーティションの総数に近い値に設定します。最適なパフォーマンスは、通常、Kafka Connectクラスタ全体で CPU コアあたり2つのタスクです。

  • バックプレッシャーを確認:backpressure-rewind-count メトリックが増加している場合、Snowpipe Streaming SDK は最大容量に達しています。Kafka Connectクラスターのスケールアウトを検討してください。

  • JVM メモリを確認:JVM のヒープを使用可能なメモリの約50%に制限します。RustベースのSnowpipe Streaming SDK はバッファリングにオフヒープメモリを使用します。これは JVM によって管理されません。

テーブルとパイプのキャッシュ保存

コネクタはテーブルとパイプの存在チェックをキャッシュに保存し、データベースクエリを削減します。コネクタが新しく作成されたテーブルやパイプを検出しない問題が発生した場合は、キャッシュの有効期限を調整してください。

snowflake.cache.table.exists.expire.ms=60000
snowflake.cache.pipe.exists.expire.ms=60000

中断されたチャネルの回復

一時的なチャネル回復は正常です。ただし、channel-recovery-count メトリックが継続的に増加している場合は、次のことを示している可能性があります。

  • コネクタのキャッシュ保存されたスキーマと競合するターゲットテーブルでのスキーマの変更。

  • コネクタのロールに影響する権限の変更。

  • Kafka ConnectクラスターとSnowflake間のネットワークの不安定性。

特定の復旧のために、コネクタのログを確認します。

SDK クライアントのリーク

sdk-client-count JMX メトリックが継続的に増加している場合は、コネクタがSnowpipe Streaming SDK クライアントをリークさせている可能性があります。個別のターゲットテーブルごとに1つの SDK クライアントが必要です。カウントが個別のテーブルの数を超える場合は、Snowflakeサポート に連絡してください。

移行に関する問題

SSv1 オフセットの移行中にチャネルが見つからない

snowflake.streaming.classic.offset.migration=strict を使用しているときに、チャネルが見つからないというエラーが発生してコネクタが失敗した場合:

  • v3デプロイと同じコネクタ名を使用していることを確認します。

  • snowflake.streaming.classic.offset.migration.include.connector.namesnowflake.streaming.channel.name.include.connector.name のv3設定と一致しているかどうかを確認します。

  • チャンネルがすでにクリーンアップされている場合、またはv3に存在しなかった新しいトピックを追加する場合は、 best_effort モードに切り替えます。

移行後の重複

v3からの移行後に重複した記録が表示された場合:

  • RECORD_METADATA にトピック、パーティション、オフセットフィールドが含まれていることを確認します。

  • Downgrading from v4 to v3 で重複排除クエリを使用て、重複を削除します。

ログ

Snowpipe Streaming SDK は詳細なログを生成できます。ログノイズを低減するには、Kafka Connectワーカーに以下の環境変数を設定します。

export SS_LOG_LEVEL=warn

コンテキスト付きの詳細なコネクタログ記録には、ログパターンを構成します。

CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN="[%d] %p %X{connector.context}%m (%c:%L)%n"

問題の報告

このガイドに記載されていない問題については、 Snowflakeサポート までお問い合わせください。