Monitor Openflow using telemetry data

This topic describes how to monitor the state of Openflow and troubleshoot problems.

Accessing Openflow logs

Snowflake sends Openflow logs to the event table you configured when you set up Openflow (BYOC | Snowflake deployment).

Snowflake recommends that you include a timestamp in the WHERE clause of event table queries. This is particularly important because of the potential volume of data generated by various Snowflake components. By applying filters, you can retrieve a smaller subset of data, which improves query performance.

To get started quickly with Openflow’s telemetry, see Example Queries below.

Openflow Telemetry Schema

For information about the event table columns, see Event table columns.

The following sections describe how Openflow structures telemetry in an Event Table.

Resource Attributes

Describes the event metadata set by Openflow. For general information on other types of resource attributes see RESOURCE_ATTRIBUTES column in the Event Table columns documentation.

NameTypeDescription
applicationStringThe fixed value openflow
cloud.service.providerStringOne of aws, snowflake
container.idStringUnique identifier of the container
container.image.nameString

Fully qualified name of the container image. All Openflow images are hosted by Snowflake repositories.

For example, <account>-openflow-<env>.registry-internal.snowflakecomputing.com/openflow/openflow/openflow_repo/runtime-server

container.image.tagStringVersion of the container image
k8s.container.nameString

The name of the K8s container. Openflow Runtime containers will start with the “Runtime Key” and end with -gateway or -server.

For example, an Openflow Runtime named “PostgreSQL CDC” with a Runtime Key of postgresql-cdc, so it would have container names of:

  • postgresql-cdc-gateway
  • postgresql-cdc-server
k8s.container.restart_countNumeric StringThe number of times this container has restarted since it was created.
k8s.namespace.nameStringK8s namespace of the pod or container, starting with runtime- for Openflow Runtimes. Values also include kube-system and openflow-runtime-infra.
k8s.node.nameString

The internal domain name of the EKS node hosting the pod / container, or the EKS node itself.

For example, ip-10-12-13-144.us-west-2.compute.internal

k8s.pod.nameString

The name of the K8s pod. Openflow Runtime pods will start with the “Runtime Key” and end with a numeric identifier for each pod replica. This number can grow up to the “Max Nodes” set for the Runtime, indexed at 0.

For example, an Openflow Runtime named “PostgreSQL CDC” with a Runtime Key of postgresql-cdc and 3 nodes would have pod names of:

  • postgresql-cdc-0
  • postgresql-cdc-1
  • postgresql-cdc-2
k8s.pod.start_timeISO 8601 Date StringTimestamp that the pod was started
k8s.pod.uidUUID StringUnique identifier of the pod within the cluster
deployment.versionStringThe Openflow deployment version.
openflow.dataplane.idUUID StringThe unique identifier of the Openflow Deployment, matching the “ID” shown in the Snowflake Openflow UI through Deployment > View Details.
Resource Attributes Example:
{
  "application": "openflow",
  "cloud.service.provider": "aws",
  "container.id": "a1b2c3d4e5f6",
  "container.image.name": "example-openflow-prod.registry-internal.snowflakecomputing.com/openflow/openflow/openflow_repo/runtime-server",
  "container.image.tag": "2026.3.17.13",
  "deployment.version": "1.35.0",
  "k8s.container.name": "pg-dev-server",
  "k8s.container.restart_count": "0",
  "k8s.namespace.name": "runtime-pg-dev",
  "k8s.node.name": "ip-10-10-62-36.us-east-2.compute.internal",
  "k8s.pod.name": "pg-dev-0",
  "k8s.pod.start_time": "2025-04-25T22:14:29Z",
  "k8s.pod.uid": "94610175-1685-4c8f-b0a1-42898d1058e6",
  "openflow.dataplane.id": "abeddb4f-95ae-45aa-95b1-b4752f30c64a"
}

Scope

NameTypeDescription
nameString

Provider of the metric. One of:

  • runtime for Openflow Connector metrics
  • github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver for system-level metrics
Scope Example:
{
  "name": "runtime"
}

Record Type

Depending on the type of Openflow telemetry represented by this row, this will be one of:

  • LOG
  • METRIC

Openflow does not collect TRACE records, but that is also a valid type for this column in Snowflake Event Tables.

Record

Optional. This JSON object describes the type of metric represented by this row.

NameTypeDescription
metricObject

Contains two fields:

  • name for the unique metric produced, typically using dot-delimited namespaces
  • unit for the value represented by the type, such as byte, nanosecond, and thread

The name and unit values vary widely. For the full list, see Application Metrics below.

metric_typeString

One of:

  • gauge for most Openflow metrics, a snapshot value that can increase or decrease
  • sum for cumulative metrics like pod CPU time and network IO
value_typeString

The primitive type of the value produced by this metric. One of:

  • INT
  • DOUBLE
aggregation_temporalityStringOptional. Set to cumulative for metrics that are strictly increasing and dependent on previous values, such as pod CPU time and network IO.
is_monotonicBooleanOptional. For cumulative metrics, this is true to show that it is strictly increasing within the time series.
Record Example:
{
  "metric": {
    "name": "connection.queued.duration.max",
    "unit": "millisecond"
  },
  "metric_type": "gauge",
  "value_type": "INT"
}

Record Attributes

Logs

Record attributes for Logs will typically indicate where this log was sourced. For example, logs from an Openflow Runtime named testruntime could have Record Attributes of:

{
  "log.file.path": "/var/log/pods/runtime-testruntime_testruntime-0_66d80cdb-9484-40a4-bdba-f92eb0af14c7/testruntime-server/0.log",
  "log.iostream": "stdout",
  "logtag": "F"
}

System Metrics

System metrics like CPU usage will typically not set Record Attributes, so this will be null.

Openflow Application Metrics

Record Attributes for Application or “Flow” metrics provide details about the component in the data pipeline that produced the metric. This will vary based on the type of component. See Application Metrics

{
  "component": "PutSnowpipeStreaming",
  "execution.node": "ALL",
  "group.id": "c052f9d7-7f76-3013-a2c5-d3b064fa7326",
  "id": "c69e2913-22a9-36bb-a159-6a5ed1fb9d63",
  "name": "PutSnowpipeStreaming",
  "type": "processor"
}

Value

This column contains the raw value of the telemetry. For metrics, this will be a numeric value (integer or double). For logs, this will either be a semi-structured string value or a well-formatted JSON string.

Openflow Runtime Logs

Openflow Runtimes emit most logs as JSON, so applying Snowflake’s TRY_PARSE_JSON to the VALUE column allows you to further break this value into the following structured fields:

NameTypeDescription
formattedMessageStringThe actual log message emitted from the Runtime logger.
levelString

One of:

  • ERROR
  • WARN
  • INFO
  • DEBUG
  • TRACE
loggerNameString

The fully qualified classname for the logger. Openflow processors will typically use logger names that start with com.snowflake.openflow.runtime.processors.

This is useful to view logs for a specific processor, controller service, or bundled library.

nanosecondsInteger

Nanosecond-level time that this log message was created, starting at milliseconds.

For example, a nanosecond value of 111222333 could correspond to a timestamp value of 1749180210111 with the leftmost 3 digits of nanosecond matching the right-most 3 digits of timestamp.

threadNameStringName of the thread handling this call. For example, Timer-Driven Process Thread-7
throwableJSON Object

null when there is no exception or stacktrace for this log message. Otherwise, it logs the stacktrace as a JSON string with fields:

  • className - the exception thrown
  • message - any message logged with the exception
  • stepArray - array of method calls for the stack trace, including:
    • className
    • fileName
    • lineNumber
    • methodName
timestampInteger

Time that this log message was created, represented as milliseconds since the UNIX epoch.

For example, 1749180210044 indicates that the log was created at 2025-06-05 03:23:30.044 UTC

mdcJSON Object

Mapped Diagnostic Context (MDC) providing additional flow-level context for the log entry. Contains the following fields:

  • processGroupId - unique identifier of the process group
  • processGroupIdPath - hierarchical path of process group IDs
  • processGroupName - name of the process group
  • processGroupNamePath - hierarchical path of process group names
  • registeredFlowIdentifier - identifier of the registered flow (present for all versioned flows, including out-of-the-box Openflow connectors)
  • registeredFlowVersion - version of the registered flow (present for all versioned flows, including out-of-the-box Openflow connectors)

For example:

{
  "processGroupId": "6dc1d98f-019d-1000-ffff-ffffa3ba8a09",
  "processGroupIdPath": "/58385a8b-019d-1000-2a52-9ef1c34b0e5f/6dc1d98f-019d-1000-ffff-ffffa3ba8a09",
  "processGroupName": "latency targets",
  "processGroupNamePath": "/Openflow/latency targets",
  "registeredFlowIdentifier": "sqlserver-multidatabase",
  "registeredFlowVersion": "0.29.0-ebb7a257"
}

Application Metrics

Note

The following list covers all application metrics available for Openflow Runtimes. Runtimes only emit a subset of metrics relevant to Openflow Connectors to persist in a Snowflake Event Table.

Snowflake’s OpenTelemetry Reporting Task can send some or all metrics to any OTLP destination.

Connection Metrics

Metric NameUnitDescription
connection.input.bytesbytesSize of Items Input
connection.input.countitemsCount of Items Input
connection.output.bytesbytesSize of Items Output
connection.output.countitemsCount of Items Output
connection.queued.bytesbytesSize of Items Queued
connection.queued.bytes.maxbytesMax Size of Items Queued
connection.queued.countitemsCount of Items Queued
connection.queued.count.maxitemsMax Count of Items Queued
connection.queued.duration.totalmillisecondsTotal Duration of Queued Items
connection.queued.duration.maxmillisecondsMax Duration of Queued Items
connection.backpressure.threshold.bytesbytesThe maximum size of data in bytes that can be queued in this connection before it applies back pressure.
connection.backpressure.threshold.objectsitemsThe configured maximum number of FlowFiles that can be queued in this connection before it applies back pressure.
connection.loadbalance.status.load_balance_not_configuredbinary, 0 or 11 if the connection does not have a configured load balance setting. Otherwise, 0.
connection.loadbalance.status.load_balance_activebinary, 0 or 11 if the connection is load balancing across the cluster. Otherwise, 0.
connection.loadbalance.status.load_balance_inactivebinary, 0 or 11 if the connection is not load balancing across the cluster. Otherwise, 0.

Connection Record Attributes

Each Connection metric includes the following Record Attributes:

AttributeDescription
idThe unique identifier of the connection
nameThe user-visible name of the connection
typeThe fixed value connection
source.idThe unique identifier of the component that is sending FlowFiles to this connection
source.nameThe user-visible name of the component that is sending FlowFiles to this connection
destination.idThe unique identifier of the component that is receiving FlowFiles from this connection
destination.nameThe user-visible name of the component that is receiving FlowFiles from this connection
group.idThe unique identifier of the Process Group that contains this Connection

Input and Output Port Metrics

Input Port and Output Ports are technically two separate types of components. For consistency, metrics and attributes for Input and Output Ports are the same, with the exception of the type attribute that indicates whether it is an input port or an output port.

Metric NameUnitDescription
port.thread.count.activethreadsNumber of Active Threads
port.bytes.receivedbytesNumber of Bytes Received
port.bytes.sentbytesNumber of Bytes Sent
port.flowfiles.receivedflowfilesNumber of FlowFiles Received
port.flowfiles.sentflowfilesNumber of FlowFiles Sent
port.input.bytesbytesSize of Items Input
port.input.countitemsCount of Items Input
port.output.bytesbytesSize of Items Output
port.output.countitemsCount of Items Output

Input and Output Port Record Attributes

Each Port metric includes the following Record Attributes:

AttributeDescription
idThe unique identifier of the port
nameThe user-visible name of the port
typeOne of port-input or port-output
group.idThe unique identifier of the Process Group that contains this Port

Process Group Metrics

Metric NameUnitDescription
processgroup.thread.count.activethreadsNumber of Active Threads
processgroup.thread.count.statelessthreadsNumber of Stateless Threads
processgroup.thread.count.terminatedthreadsNumber of Terminated Threads
processgroup.bytes.readbytesNumber of Bytes Read
processgroup.bytes.receivedbytesNumber of Bytes Received
processgroup.bytes.transferredbytesNumber of Bytes Transferred
processgroup.bytes.sentbytesNumber of Bytes Sent
processgroup.bytes.writtenbytesNumber of Bytes Written
processgroup.flowfiles.receivedflowfilesNumber of FlowFiles Received
processgroup.flowfiles.sentflowfilesNumber of FlowFiles Sent
processgroup.flowfiles.transferredflowfilesNumber of FlowFiles Transferred
processgroup.input.countitemsNumber of Items Input
processgroup.input.content.sizebytesSize of Items Input
processgroup.output.countitemsNumber of Items Output
processgroup.output.content.sizebytesSize of Items Output
processgroup.queued.countitemsNumber of Items Queued
processgroup.queued.content.sizebytesSize of Items Queued
processgroup.time.processingnanosecondsTime Spent Processing

Process Group Record Attributes

Each Process Group metric includes the following Record Attributes:

AttributeDescription
idThe unique identifier of the Process Group
nameThe user-visible name of the Process Group
typeThe fixed value process-group
tree.levelThe depth of the Process Group, relative to the root process group of the flow. Process Groups at the highest level of the flow will have a tree.level of 1

Processor Metrics

Metric NameUnitDescription
processor.thread.count.activethreadNumber of Active Threads
processor.thread.count.terminatedthreadNumber of Terminated Threads
processor.time.lineage.averagenanosecondAverage Lineage Duration
processor.invocationsinvocationsNumber of Invocations
processor.bytes.readbyteNumber of Bytes Read
processor.bytes.receivedbyteNumber of Bytes Received
processor.bytes.sentbyteNumber of Bytes Sent
processor.bytes.writtenbyteNumber of Bytes Written
processor.flowfiles.receivedflowfilesNumber of FlowFiles Received
processor.flowfiles.removedflowfilesNumber of FlowFiles Removed
processor.flowfiles.sentflowfilesNumber of FlowFiles Sent
processor.input.countitemNumber of Items Input
processor.input.content.sizebytesSize of Items Input
processor.output.countitemNumber of Items Output
processor.output.content.sizebyteSize of Items Output
processor.time.processingnanosecondTime Spent Processing
processor.run.status.runningbinary, 0 or 11 if running; 0 otherwise
processor.run.status.stoppedbinary, 0 or 11 if stopped; 0 otherwise
processor.run.status.validatingbinary, 0 or 11 if validating; 0 otherwise
processor.run.status.invalidbinary, 0 or 11 if invalid; 0 otherwise
processor.run.status.disabledbinary, 0 or 11 if disabled; 0 otherwise
processor.countercountValue of the counter

Processor Record Attributes

Each Processor metric includes the following Record Attributes:

AttributeDescription
idThe unique identifier of the processor
nameThe user-visible and user-editable name of the Processor
typeThe fixed value processor
componentThe immutable class name of the processor.
execution.nodeEither ALL or PRIMARY, depending on how this Processor is configured to run
group.idThe unique identifier of the Process Group that contains this Processor

Additional Attributes for Counters​

In addition to the standard Processor attributes above, processor.counter metrics include the following:

AttributeDescription
typeThe fixed value counter
counterThe user- or system-generated name of the counter

Remote Process Group Metrics

Metric NameUnitDescription
remoteprocessgroup.thread.count.activethreadsNumber of Active Threads
remoteprocessgroup.remote.port.count.activeportsNumber of Active Remote Ports
remoteprocessgroup.remote.port.count.inactiveportsNumber of Inactive Remote Ports
remoteprocessgroup.duration.lineage.averagenanosecondsAverage Lineage Duration
remoteprocessgroup.refresh.agemillisecondsTime since last refresh
remoteprocessgroup.received.countitemsNumber of Received Items
remoteprocessgroup.received.content.sizebytesSize of Received Items
remoteprocessgroup.sent.countitemsNumber of Sent Items
remoteprocessgroup.sent.content.sizebytesSize of Sent Items
remoteprocessgroup.transmission.status.transmittingbinary, 0 or 11 if the Remote Process Group is transmitting. Otherwise, 0.
remoteprocessgroup.transmission.status.nottransmittingbinary, 0 or 10 if the Remote Process Group is transmitting. Otherwise, 1.

Remote Process Group Record Attributes

Each Remote Process Group metric includes the following Record Attributes:

AttributeDescription
idThe unique identifier of the remote process group
nameThe user-visible name of the Remote Process Group
group.idThe unique identifier of the Process Group that contains this Remote Process Group
authorization.issueThe Authorization used to access the Remote Process Group
target.uriThe URI of the Remote Process Group
typeThe fixed value remote-process-group

JVM Metrics

Metric NameUnitDescription
jvm.memory.heap.usedbytesThe amount of memory currently occupied by objects on the JVM Heap
jvm.memory.heap.committedbytesThe amount of memory guaranteed to be available for use by the JVM Heap
jvm.memory.heap.maxbytesMaximum amount of memory allocated for the JVM Heap
jvm.memory.heap.initbytesInitial amount of memory allocated for the JVM Heap
jvm.memory.heap.usagepercentageJVM Heap Usage
jvm.memory.non-heap.usagepercentageJVM Non-Heap Usage
jvm.memory.total.initbytesInitial amount of memory allocated for the JVM
jvm.memory.total.usedbytesCurrent amount of memory used by the JVM
jvm.memory.total.maxbytesMaximum amount of memory that can be used by the JVM
jvm.memory.total.committedbytesThe amount of memory guaranteed to be available for use by the JVM
jvm.threads.countthreadsNumber of live threads
jvm.threads.deadlocksthreadsJVM Thread Deadlocks
jvm.threads.daemon.countthreadsNumber of live daemon threads
jvm.uptimesecondsNumber of seconds the JVM process has been running
jvm.file.descriptor.usagepercentagePercentage of available file descriptors currently in use.
jvm.gc.G1-Concurrent-GC.runsrunsTotal number of times that the G1 Concurrent Garbage Collection has run
jvm.gc.G1-Concurrent-GC.timemillisecondsTotal amount of time that the G1 Concurrent Garbage Collection has been running
jvm.gc.G1-Young-Generation.runsrunsTotal number of times that the G1 Young Generation has run
jvm.gc.G1-Young-Generation.timemillisecondsTotal amount of time that the G1 Young Generation has been running
jvm.gc.G1-Old-Generation.runsrunsTotal number of times that the G1 Old Generation has run
jvm.gc.G1-Old-Generation.timemillisecondsTotal amount of time that the G1 Old Generation has been running

JVM Record Attributes

JVM metrics do not provide Record Attributes.

CPU Metrics

Metric NameUnitDescription
cores.availablecoresThe number of available cores for the Runtime
cores.loadpercentageEither the system load average or -1 if it is not available

CPU Record Attributes

AttributeDescription
idThe fixed value cpu
nameThe name of the operating system
architectureThe architecture of the operating system
versionThe version of the operating system

Storage Metrics

Metric NameUnitDescription
storage.freebytesThe amount of free storage for a given repository
storage.usedbytesThe amount of used storage for a given repository

Storage Record Attributes

AttributeDescription
idThe unique identifier of the storage repository
nameSame as id and provided for consistency
storage.typeOne of flowfile, content, or provenance

Example Queries

The following queries are examples to get you started with Openflow Telemetry.

All queries assume that Openflow is configured to send telemetry to the default Event Table of SNOWFLAKE.TELEMETRY.EVENTS. If your Snowflake Account or Openflow Deployment is configured with a different Event Table, substitute that table name where you see SNOWFLAKE.TELEMETRY.EVENTS.

Find Stuck FlowFiles

This query returns connections with FlowFiles that have been queued for more than some threshold, indicating that they may be stuck and require intervention. Adjust the 30 minute threshold as needed for your use case.

SELECT * FROM (
  SELECT
    resource_attributes:"openflow.dataplane.id" as Deployment_ID,
    resource_attributes:"k8s.namespace.name" as Runtime_Key,
    record_attributes:name as Connection_Name,
    record_attributes:id as Connection_ID,
    MAX(TO_NUMBER(value / 60 / 1000)) as Max_Queued_File_Minutes
  FROM snowflake.telemetry.events
  WHERE true
    AND record_type = 'METRIC'
    AND record:metric:name = 'connection.queued.duration.max'
    AND timestamp > dateadd(minutes, -30, sysdate())
  GROUP BY 1, 2, 3, 4
  ORDER BY Max_Queued_File_Minutes DESC
) WHERE Max_Queued_File_Minutes > 30;

Find Error Logs for Openflow Runtimes

SELECT
  timestamp,
  Deployment_ID,
  Runtime_Key,
  parsed_log:level as log_level,
  parsed_log:loggerName as logger,
  parsed_log:formattedMessage as message,
  parsed_log
FROM (
  SELECT
    timestamp,
    resource_attributes:"openflow.dataplane.id" as Deployment_ID,
    resource_attributes:"k8s.namespace.name" as Runtime_Key,
    TRY_PARSE_JSON(value) as parsed_log
  FROM snowflake.telemetry.events
  WHERE true
    AND timestamp > dateadd('minutes', -30, sysdate())
    AND record_type = 'LOG'
    AND resource_attributes:"k8s.namespace.name" like 'runtime-%'
  ORDER BY timestamp DESC
) WHERE log_level = 'ERROR';

Find Running and Non-Running Processors

Some flows expect that all processors are in a “running” state, even if they are not actively processing data.

This query helps you find any processors that are running or in another state, such as:

  • stopped
  • invalid
  • disabled
SELECT
  timestamp,
  resource_attributes:"openflow.dataplane.id" as Deployment_ID,
  resource_attributes:"k8s.namespace.name" as Runtime_Key,
  record_attributes:component as Processor,
  record_attributes:id as Processor_ID,
  TO_NUMBER(value) as Running
FROM snowflake.telemetry.events
WHERE true
  AND record:metric:name = 'processor.run.status.running'
  AND record_type = 'METRIC'
  AND timestamp > dateadd(minutes, -30, sysdate());

Find High CPU Usage for Openflow Runtimes

Slow data flows or reduced throughput may be the result of a bottleneck on the CPU. Openflow Runtimes scale up automatically, based on the number of minimum and maximum nodes you have configured.

If an Openflow Runtime is using its maximum number of nodes and still CPU usage remains high, consider:

  1. Increasing the maximum number of nodes allocated to the Runtime
  2. Troubleshoot the Connector or flow to identify the bottleneck

Snowsight Charts provide an easy way to visualize query results for CPU usage over time.

SELECT
  timestamp,
  resource_attributes:"openflow.dataplane.id" as Deployment_ID,
  resource_attributes:"k8s.namespace.name" as Runtime_Key,
  resource_attributes:"k8s.pod.name" as Runtime_Pod,
  TO_NUMBER(value, 10, 3) * 100 as CPU_Usage_Percentage
FROM snowflake.telemetry.events
WHERE true
  AND timestamp > dateadd(minute, -30, sysdate())
  AND record_type = 'METRIC'
  AND record:metric:name ilike 'container.cpu.usage'
  AND resource_attributes:"k8s.namespace.name" ilike 'runtime-%'
  AND resource_attributes:"k8s.container.name" ilike '%-server'
ORDER BY timestamp desc, CPU_Usage_Percentage desc;