Monitor the Snowflake Connector for Kafka

This topic describes how to monitor the Snowflake Connector for Kafka.

JMX monitoring

The connector exposes metrics through Java Management Extensions (JMX) MBeans. JMX is enabled by default (jmx=true).

To use JMX monitoring, configure your Kafka Connect worker JVM with the following system properties:

-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=<port>
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false

You can then use any JMX-compatible monitoring tool (for example, JConsole or Prometheus with a JMX exporter) to view the connector metrics.

MBean domain and naming

The connector registers MBeans under the domain snowflake.kafka.connector with the following ObjectName pattern:

snowflake.kafka.connector:connector=<connectorName>,<scope>=<scopeValue>,category=<category>,name=<metricName>

Where <scope> is either task (task-level metrics) or channel (per-partition channel metrics).

Task-level metrics

These metrics are scoped to a connector task and provide aggregate visibility into throughput and lifecycle operations.

MetricTypeDescription
put-recordsMeterNumber of records received by the task via put(). Use the rate attributes (1-minute, 5-minute, 15-minute) to monitor ingestion throughput.
put-durationTimerDuration of each put() call. High values can indicate connector bottlenecks or downstream backpressure.
precommit-durationTimerDuration of pre-commit operations, which include offset verification.
assigned-partitionsGaugeCurrent number of Kafka partitions assigned to this task. Useful for verifying balanced partition distribution across tasks.
channel-open-durationTimerTime to open Snowpipe Streaming channels. Elevated values may indicate connection issues.
channel-open-countCounterTotal number of channels opened by this task.
backpressure-rewind-countCounterNumber of times the connector had to rewind due to downstream backpressure. Sustained non-zero values indicate the connector is producing faster than Snowflake can ingest.

Channel-level metrics

These metrics are scoped to a specific Kafka topic-partition channel and are essential for monitoring ingestion lag and data durability.

MetricTypeDescription
processed-offsetGaugeThe most recent offset buffered by the connector. This is the latest record received from Kafka for this partition.
persisted-in-snowflake-offsetGaugeThe latest offset confirmed as durably committed in Snowflake. Compare this to processed-offset to measure ingestion lag.
latest-consumer-offsetGaugeThe latest offset available from the Kafka consumer for this partition. Compare to persisted-in-snowflake-offset to see the full end-to-end lag.
channel-recovery-countGaugeNumber of channel recovery events. A high or increasing value indicates instability in the Snowpipe Streaming channel.

Key metrics for alerting

For production deployments, consider alerting on the following:

  • Ingestion lag: latest-consumer-offset minus persisted-in-snowflake-offset. A growing gap indicates the connector is falling behind.
  • Backpressure: backpressure-rewind-count increasing over time.
  • Channel recovery: channel-recovery-count increasing, which may indicate connectivity or authentication issues.
  • Put duration: put-duration mean or p99 exceeding your acceptable threshold.

MDC logging

Enable MDC (Mapped Diagnostic Context) logging to prepend connector context to log messages. This is useful when running multiple connector instances and you need to correlate log entries:

enable.mdc.logging=true

Estimating ingestion latency

The SnowflakeConnectorPushTime field in RECORD_METADATA records the timestamp when the connector buffered a record for ingestion. You can use this value to estimate end-to-end ingestion latency by comparing it against the time the record becomes queryable in Snowflake.

For example:

SELECT
  RECORD_METADATA:topic::STRING AS topic,
  RECORD_METADATA:partition::NUMBER AS partition,
  RECORD_METADATA:offset::NUMBER AS offset,
  TIMESTAMPDIFF('millisecond',
    TO_TIMESTAMP(RECORD_METADATA:SnowflakeConnectorPushTime::BIGINT, 3),
    CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP())
  ) AS latency_ms
FROM my_table
ORDER BY latency_ms DESC
LIMIT 10;

For more information about monitoring Snowpipe Streaming ingestion, see Snowpipe Streaming key concepts.