Java Management Extensions(JMX)を使用したKafkaコネクタのモニター

このトピックでは、Java Management Extensions(JMX)を使用してKafka用Snowflakeコネクタをモニターする方法について説明します。Kafka Connectは、Kafkaコネクタに関する情報を提供する事前設定された JMX メトリックを提供します。Kafka用Snowflakeコネクタは、Kafka環境に関するメトリックを取り込むために使用できる複数のManaged Beans(MBeans)を提供します。この情報は、PrometheusやGrafanaなどのサードパーティツールに読み込むことができます。

JMX 機能は、コネクタではデフォルトで有効になっています。JMX を無効にするには、 jmx プロパティを false に設定します。

重要

Snowpipe は、Kafkaコネクタバージョン1.6.0以降をサポートします。

Snowpipe Streaming は、Kafkaコネクタバージョン2.1.2以降をサポートします。

このトピックの内容:

Kafkaコネクタでの JMX の構成

JMX は、Kafka用Snowflakeコネクタではデフォルトで有効になっています。Kafkaで JMX を有効にするには、次を実行します。

  1. JMX を有効にしてKafkaインストールに接続します。

    • リモートサーバーで実行されているKafkaインストールに JMX 接続するには、KafkaConnect起動スクリプトで KAFKA_JMX_OPTS 環境変数を設定します。

      export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true
          -Dcom.sun.management.jmxremote.authenticate=false
          -Dcom.sun.management.jmxremote.ssl=false
          -Djava.rmi.server.hostname=<ip_address>
          -Dcom.sun.management.jmxremote.port=<jmx_port>"
      
      Copy

      条件:

      • ip_address: Kafka Connectインストールの IP アドレスを指定します。

      • jmx_port: Kafka Connectが JMX 接続をリッスンする JMX ポートを指定します。

    • 同じサーバーで実行されているKafkaに JMX 接続するには、Kafka起動スクリプトで JMX_PORT 環境変数を設定します。

      export JMX_PORT=<port_number>
      
      Copy

      ここで、 port_number はKafkaインストールの JMX ポートです。

  2. Kafkaコネクタを再起動します。

Snowflake KafkaコネクタManaged Beansの使用(MBeans)

JMX は MBeans を使用して、モニターできるKafka内のオブジェクト(例: スレッド数、CPU負荷など)を表します。Snowflake Kafkaコネクタは、コネクタによって管理されるオブジェクトにアクセスするための MBeans を提供します。これらの MBeans を使用して、モニターダッシュボードを作成できます。

Kafkaコネクタ MBean オブジェクト名の一般的な形式は次のとおりです。

snowflake.kafka.connector:connector=connector_name,pipe=pipe_name,category=category_name,name=metric_name

条件:

  • connector=connector_name は、Kafka構成ファイルで定義されているコネクタの名前を指定します。

  • pipe=pipe_name は、データのインジェストに使用されるSnowpipeオブジェクトを指定します。Kafkaコネクタは、各パーティションのSnowpipeオブジェクトを定義します。

  • category=category_name は、 MBean のカテゴリを指定します。各カテゴリには、一連のメトリックが含まれています。

  • name=metric_name は、メトリックの名前を指定します。

次のセクションでは、Snowflake Kafkaコネクタによって提供されるカテゴリとメトリックの名前をリストします。

カテゴリ: file-counts

このカテゴリのメトリックは、SnowpipeベースのKafkaコネクタにのみ適用され、Snowpipe Streamingには適用されません。

メトリック名

データ型

説明

file-count-on-stage

long

現在、内部ステージにあるファイルの数。この値は、ファイルのパージプロセスが開始された後に減少します。このプロパティは、現在内部ステージにあるファイルの数の推定を提供します。

file-count-on-ingestion

long

insertFiles REST API を呼び出すことによって決定されたSnowpipe内のファイルの数。現在、単一の REST API リクエストを介して送信されるファイルには5kの制限があります。ファイルの数と REST API 呼び出しの数の間には1対1の関係はありません。 insertFiles REST API への呼び出しの数は、この値より大きくなる可能性があります。インジェストするファイルがこれ以上ない場合、このプロパティの値は 0 です。

file-count-table-stage-ingestion-fail

long

インジェスチョンに失敗したテーブルステージ上のファイルの数。

file-count-table-stage-broken-record

long

壊れたオフセットに対応する、テーブルステージに存在するファイルの数。

file-count-purged

long

インジェスチョンステータスが決定された後、内部ステージからパージされたファイルの数。

カテゴリ: offsets

offsetPersistedInSnowflake および latestConsumerOffset のメトリックは、Snowpipe StreamingベースのKafkaコネクタに適用されます。このカテゴリの残りは、SnowpipeベースのKafkaコネクタにのみ適用されます。

メトリック名

データ型

説明

processed-offset

long

メモリ内のバッファーに送信された最新の記録を参照するオフセット。

flushed-offset

long

バッファーのしきい値に達した後、内部ステージでフラッシュされている記録を参照するオフセット。バッファーは、時間、記録数、またはサイズによってしきい値に達する可能性があります。

committed-offset

long

事前にコミットした API が呼び出され、Snowpipe insertFiles REST API が呼び出された記録を参照するオフセット。

purged-offset

long

内部ステージからパージされている記録を参照するオフセット。この数値は、内部ステージからパージされた最新のオフセットの最大値です。

offsetPersistedInSnowflake

long

Snowflakeで最新の永続データを持つ記録を参照するオフセット。オフセットは insertRows API コールで決定されます。

latestConsumerOffset

long

メモリ内のバッファーに送信された最新の記録を参照するオフセット。チャネルオフセットトークンが NULL の場合にのみ、オフセットの再送に使用されます。

カテゴリ: buffer

このカテゴリのメトリックは、SnowpipeベースのKafkaコネクタでのみ使用できます。

メトリック名

データ型

説明

buffer-size-bytes

long

バッファーのしきい値に基づいて、内部ステージにフラッシュされる前のバッファーサイズ(バイト単位)を返します。ファイルは内部ステージにロードされるときに圧縮されるため、この値はファイルサイズと同じではない場合があります。

buffer-record-count

long

バッファーのしきい値に基づいて、バッファーが内部ステージにフラッシュされる前にメモリにバッファーされたKafka記録の数を返します。

カテゴリ: latencies

このカテゴリのメトリックは、SnowpipeベースのKafkaコネクタでのみ使用できます。

メトリック名

データ型

説明

kafka-lag

long

記録がKafkaに入力された時間と、記録がKafka Connectにフェッチされた時間の差(秒単位)。値が記録内に設定されていない場合、この値はnullになる可能性があることに注意してください。

commit-lag

long

ファイルが内部ステージにアップロードされた時間と insertFiles REST API が呼び出された時間の差(秒単位)。

ingestion-lag

long

ファイルが内部ステージにアップロードされた時間と、ファイルのインジェスチョンステータスが insertReport または loadHistoryScan API を介して報告された時間との差(秒単位)。