Monitor Openflow¶
This topic describes how to monitor the state of Openflow and troubleshoot problems.
Accessing Openflow logs¶
Snowflake sends Openflow logs to the event table you configured when you set up Openflow.
Snowflake recommends that you include a timestamp in the WHERE clause of event table queries. This is particularly important because of the potential volume of data generated by various Snowflake components. By applying filters, you can retrieve a smaller subset of data, which improves query performance.
To get started quickly with Openflow’s telemetry, see Example Queries below.
Openflow Telemetry Schema¶
For information about the event table columns, see Event table columns.
The following sections describe how Openflow structures telemetry in an Event Table.
Resource Attributes¶
Describes the event metadata set by Openflow. For general information on other types of resource attributes see RESOURCE_ATTRIBUTES column in the Event Table columns documentation.
Name |
Type |
Description |
---|---|---|
application |
String |
The fixed value |
cloud.service.provider |
String |
One of |
container.id |
String |
Unique identifier of the container |
container.image.name |
String |
Fully qualified name of the container image. Openflow Runtime containers will include the path to the local container registry. For example, |
container.image.tag |
String |
Version of the container image |
k8s.container.name |
String |
The name of the K8s container. Openflow Runtime containers will start with the “Runtime Key” and end with For example, an Openflow Runtime named “PostgreSQL CDC” with a Runtime Key of postgresql-cdc, so it would have container names of:
|
k8s.container.restart_count |
Numeric String |
The number of times this container has restarted since it was created. |
k8s.namespace.name |
String |
K8s namespace of the pod or container, starting with |
k8s.node.name |
String |
The internal domain name of the EKS node hosting the pod / container, or the EKS node itself. For example, ip-10-12-13-144.us-west-2.compute.internal |
k8s.pod.name |
String |
The name of the K8s pod. Openflow Runtime pods will start with the “Runtime Key” and end with a numeric identifier for each pod replica. This number can grow up to the “Max Nodes” set for the Runtime, indexed at 0. For example, an Openflow Runtime named “PostgreSQL CDC” with a Runtime Key of postgresql-cdc and 3 nodes would have pod names of:
|
k8s.pod.start_time |
ISO 8601 Date String |
Timestamp that the pod was started |
k8s.pod.uid |
UUID String |
Unique identifier of the pod within the cluster |
openflow.dataplane.id |
UUID String |
The unique identifier of the Openflow Deployment, matching the “ID” shown in the Snowflake Openflow UI through Deployment > View Details. |
- Resource Attributes Example:
{ "application": "openflow", "cloud.service.provider": "aws", "k8s.container.name": "pg-dev-server", "k8s.container.restart_count": "0", "k8s.namespace.name": "runtime-pg-dev", "k8s.node.name": "ip-10-10-62-36.us-east-2.compute.internal", "k8s.pod.name": "pg-dev-0", "k8s.pod.start_time": "2025-04-25T22:14:29Z", "k8s.pod.uid": "94610175-1685-4c8f-b0a1-42898d1058e6", "k8s.statefulset.name": "pg-dev", "openflow.dataplane.id": "abeddb4f-95ae-45aa-95b1-b4752f30c64a" }
Scope¶
Name |
Type |
Description |
---|---|---|
name |
String |
Provider of the metric. One of:
|
- Scope Example:
{ "name": "runtime" }
Record Type¶
Depending on the type of Openflow telemetry represented by this row, this will be one of:
LOG
METRIC
Openflow does not collect TRACE records, but that is also a valid type for this column in Snowflake Event Tables.
Record¶
Optional. This JSON object describes the type of metric represented by this row.
Name |
Type |
Description |
---|---|---|
metric |
Object |
Contains two fields:
The name and unit values vary widely. For the full list, see Application Metrics below. |
metric_type |
String |
One of:
|
value_type |
String |
The primitive type of the value produced by this metric. One of:
|
aggregation_temporality |
String |
Optional. Set to cumulative for metrics that are strictly increasing and dependent on previous values, such as pod CPU time and network IO. |
is_monotonic |
Boolean |
Optional. For cumulative metrics, this is true to show that it is strictly increasing within the time series. |
- Record Example:
{ "metric": { "name": "connection.queued.duration.max", "unit": "millisecond" }, "metric_type": "gauge", "value_type": "INT" }
Record Attributes¶
Logs¶
Record attributes for Logs will typically indicate where this log was sourced. For example, logs from an Openflow Runtime named testruntime
could have Record Attributes of:
{ "log.file.path": "/var/log/pods/runtime-testruntime_testruntime-0_66d80cdb-9484-40a4-bdba-f92eb0af14c7/testruntime-server/0.log", "log.iostream": "stdout", "logtag": "F" }
System Metrics¶
System metrics like CPU usage will typically not set Record Attributes, so this will be null
.
Openflow Application Metrics¶
Record Attributes for Application or “Flow” metrics provide details about the component in the data pipeline that produced the metric. This will vary based on the type of component. See Application Metrics
{ "component": "PutSnowpipeStreaming", "execution.node": "ALL", "group.id": "c052f9d7-7f76-3013-a2c5-d3b064fa7326", "id": "c69e2913-22a9-36bb-a159-6a5ed1fb9d63", "name": "PutSnowpipeStreaming", "type": "processor" }
Value¶
This column contains the raw value of the telemetry. For metrics, this will be a numeric value (integer or double). For logs, this will either be a semi-structured string value or a well-formatted JSON string.
Openflow Runtime Logs¶
Openflow Runtimes emit most logs as JSON, so applying Snowflake’s TRY_PARSE_JSON to the VALUE
column allows you to further break this value into the following structured fields:
Name |
Type |
Description |
---|---|---|
formattedMessage |
String |
The actual log message emitted from the Runtime logger. |
level |
String |
One of:
|
loggerName |
String |
The fully qualified classname for the logger. Openflow processors will typically use logger names that start with This is useful to view logs for a specific processor, controller service, or bundled library. |
nanoseconds |
Integer |
Nanosecond-level time that this log message was created, starting at milliseconds. For example, a nanosecond value of 111222333 could correspond to a timestamp value of 1749180210111 with the leftmost 3 digits of nanosecond matching the right-most 3 digits of timestamp. |
threadName |
String |
Name of the thread handling this call. For example, |
throwable |
JSON Object |
|
timestamp |
Integer |
Time that this log message was created, represented as milliseconds since the UNIX epoch. For example, 1749180210044 indicates that the log was created at 2025-06-05 03:23:30.044 UTC |
Application Metrics¶
Note
The following list covers all application metrics available for Openflow Runtimes. Runtimes only emit a subset of metrics relevant to Openflow Connectors to persist in a Snowflake Event Table.
Snowflake’s OpenTelemetry Reporting Task can send some or all metrics to any OTLP destination.
Connection Metrics¶
Metric Name |
Unit |
Description |
---|---|---|
connection.input.bytes |
bytes |
Size of Items Input |
connection.input.count |
items |
Count of Items Input |
connection.output.bytes |
bytes |
Size of Items Output |
connection.output.count |
items |
Count of Items Output |
connection.queued.bytes |
bytes |
Size of Items Queued |
connection.queued.bytes.max |
bytes |
Max Size of Items Queued |
connection.queued.count |
items |
Count of Items Queued |
connection.queued.count.max |
items |
Max Count of Items Queued |
connection.queued.duration.total |
milliseconds |
Total Duration of Queued Items |
connection.queued.duration.max |
milliseconds |
Max Duration of Queued Items |
connection.backpressure.threshold.bytes |
bytes |
The maximum size of data in bytes that can be queued in this connection before it applies back pressure. |
connection.backpressure.threshold.objects |
items |
The configured maximum number of FlowFiles that can be queued in this connection before it applies back pressure. |
connection.loadbalance.status.load_balance_not_configured |
binary, 0 or 1 |
1 if the connection does not have a configured load balance setting. Otherwise, 0. |
connection.loadbalance.status.load_balance_active |
binary, 0 or 1 |
1 if the connection is load balancing across the cluster. Otherwise, 0. |
connection.loadbalance.status.load_balance_inactive |
binary, 0 or 1 |
1 if the connection is not load balancing across the cluster. Otherwise, 0. |
Connection Record Attributes¶
Each Connection metric includes the following Record Attributes:
Attribute |
Description |
---|---|
id |
The unique identifier of the connection |
name |
The user-visible name of the connection |
type |
The fixed value |
source.id |
The unique identifier of the component that is sending FlowFiles to this connection |
source.name |
The user-visible name of the component that is sending FlowFiles to this connection |
destination.id |
The unique identifier of the component that is receiving FlowFiles from this connection |
destination.name |
The user-visible name of the component that is receiving FlowFiles from this connection |
group.id |
The unique identifier of the Process Group that contains this Connection |
Input and Output Port Metrics¶
Input Port and Output Ports are technically two separate types of components. For consistency, metrics and attributes for Input and Output Ports are the same, with the exception of the type
attribute that indicates whether it is an input port or an output port.
Metric Name |
Unit |
Description |
---|---|---|
port.thread.count.active |
threads |
Number of Active Threads |
port.bytes.received |
bytes |
Number of Bytes Received |
port.bytes.sent |
bytes |
Number of Bytes Sent |
port.flowfiles.received |
flowfiles |
Number of FlowFiles Received |
port.flowfiles.sent |
flowfiles |
Number of FlowFiles Sent |
port.input.bytes |
bytes |
Size of Items Input |
port.input.count |
items |
Count of Items Input |
port.output.bytes |
bytes |
Size of Items Output |
port.output.count |
items |
Count of Items Output |
Input and Output Port Record Attributes¶
Each Port metric includes the following Record Attributes:
Attribute |
Description |
---|---|
id |
The unique identifier of the port |
name |
The user-visible name of the port |
type |
One of |
group.id |
The unique identifier of the Process Group that contains this Port |
Process Group Metrics¶
Metric Name |
Unit |
Description |
---|---|---|
processgroup.thread.count.active |
threads |
Number of Active Threads |
processgroup.thread.count.stateless |
threads |
Number of Stateless Threads |
processgroup.thread.count.terminated |
threads |
Number of Terminated Threads |
processgroup.bytes.read |
bytes |
Number of Bytes Read |
processgroup.bytes.received |
bytes |
Number of Bytes Received |
processgroup.bytes.transferred |
bytes |
Number of Bytes Transferred |
processgroup.bytes.sent |
bytes |
Number of Bytes Sent |
processgroup.bytes.written |
bytes |
Number of Bytes Written |
processgroup.flowfiles.received |
flowfiles |
Number of FlowFiles Received |
processgroup.flowfiles.sent |
flowfiles |
Number of FlowFiles Sent |
processgroup.flowfiles.transferred |
flowfiles |
Number of FlowFiles Transferred |
processgroup.input.count |
items |
Number of Items Input |
processgroup.input.content.size |
bytes |
Size of Items Input |
processgroup.output.count |
items |
Number of Items Output |
processgroup.output.content.size |
bytes |
Size of Items Output |
processgroup.queued.count |
items |
Number of Items Queued |
processgroup.queued.content.size |
bytes |
Size of Items Queued |
processgroup.time.processing |
nanoseconds |
Time Spent Processing |
Process Group Record Attributes¶
Each Process Group metric includes the following Record Attributes:
Attribute |
Description |
---|---|
id |
The unique identifier of the Process Group |
name |
The user-visible name of the Process Group |
type |
The fixed value |
tree.level |
The depth of the Process Group, relative to the root process group of the flow. Process Groups at the highest level of the flow will have a tree.level of 1 |
Processor Metrics¶
Metric Name |
Unit |
Description |
---|---|---|
processor.thread.count.active |
thread |
Number of Active Threads |
processor.thread.count.terminated |
thread |
Number of Terminated Threads |
processor.time.lineage.average |
nanosecond |
Average Lineage Duration |
processor.invocations |
invocations |
Number of Invocations |
processor.bytes.read |
byte |
Number of Bytes Read |
processor.bytes.received |
byte |
Number of Bytes Received |
processor.bytes.sent |
byte |
Number of Bytes Sent |
processor.bytes.written |
byte |
Number of Bytes Written |
processor.flowfiles.received |
flowfiles |
Number of FlowFiles Received |
processor.flowfiles.removed |
flowfiles |
Number of FlowFiles Removed |
processor.flowfiles.sent |
flowfiles |
Number of FlowFiles Sent |
processor.input.count |
item |
Number of Items Input |
processor.input.content.size |
bytes |
Size of Items Input |
processor.output.count |
item |
Number of Items Output |
processor.output.content.size |
byte |
Size of Items Output |
processor.time.processing |
nanosecond |
Time Spent Processing |
processor.run.status.running |
binary, 0 or 1 |
1 if running; 0 otherwise |
processor.run.status.stopped |
binary, 0 or 1 |
1 if stopped; 0 otherwise |
processor.run.status.validating |
binary, 0 or 1 |
1 if validating; 0 otherwise |
processor.run.status.invalid |
binary, 0 or 1 |
1 if invalid; 0 otherwise |
processor.run.status.disabled |
binary, 0 or 1 |
1 if disabled; 0 otherwise |
processor.counter |
count |
Value of the counter |
Processor Record Attributes¶
Each Processor metric includes the following Record Attributes:
Attribute |
Description |
---|---|
id |
The unique identifier of the processor |
name |
The user-visible and user-editable name of the Processor |
type |
The fixed value |
component |
The immutable class name of the processor. |
execution.node |
Either |
group.id |
The unique identifier of the Process Group that contains this Processor |
Additional Attributes for Counters¶
In addition to the standard Processor attributes above, processor.counter
metrics include the following:
Attribute |
Description |
---|---|
type |
The fixed value |
counter |
The user- or system-generated name of the counter |
Remote Process Group Metrics¶
Metric Name |
Unit |
Description |
---|---|---|
remoteprocessgroup.thread.count.active |
threads |
Number of Active Threads |
remoteprocessgroup.remote.port.count.active |
ports |
Number of Active Remote Ports |
remoteprocessgroup.remote.port.count.inactive |
ports |
Number of Inactive Remote Ports |
remoteprocessgroup.duration.lineage.average |
nanoseconds |
Average Lineage Duration |
remoteprocessgroup.refresh.age |
milliseconds |
Time since last refresh |
remoteprocessgroup.received.count |
items |
Number of Received Items |
remoteprocessgroup.received.content.size |
bytes |
Size of Received Items |
remoteprocessgroup.sent.count |
items |
Number of Sent Items |
remoteprocessgroup.sent.content.size |
bytes |
Size of Sent Items |
remoteprocessgroup.transmission.status.transmitting |
binary, 0 or 1 |
1 if the Remote Process Group is transmitting. Otherwise, 0. |
remoteprocessgroup.transmission.status.nottransmitting |
binary, 0 or 1 |
0 if the Remote Process Group is transmitting. Otherwise, 1. |
Remote Process Group Record Attributes¶
Each Remote Process Group metric includes the following Record Attributes:
Attribute |
Description |
---|---|
id |
The unique identifier of the remote process group |
name |
The user-visible name of the Remote Process Group |
group.id |
The unique identifier of the Process Group that contains this Remote Process Group |
authorization.issue |
The Authorization used to access the Remote Process Group |
target.uri |
The URI of the Remote Process Group |
type |
The fixed value |
JVM Metrics¶
Metric Name |
Unit |
Description |
---|---|---|
jvm.memory.heap.used |
bytes |
The amount of memory currently occupied by objects on the JVM Heap |
jvm.memory.heap.committed |
bytes |
The amount of memory guaranteed to be available for use by the JVM Heap |
jvm.memory.heap.max |
bytes |
Maximum amount of memory allocated for the JVM Heap |
jvm.memory.heap.init |
bytes |
Initial amount of memory allocated for the JVM Heap |
jvm.memory.heap.usage |
percentage |
JVM Heap Usage |
jvm.memory.non-heap.usage |
percentage |
JVM Non-Heap Usage |
jvm.memory.total.init |
bytes |
Initial amount of memory allocated for the JVM |
jvm.memory.total.used |
bytes |
Current amount of memory used by the JVM |
jvm.memory.total.max |
bytes |
Maximum amount of memory that can be used by the JVM |
jvm.memory.total.committed |
bytes |
The amount of memory guaranteed to be available for use by the JVM |
jvm.threads.count |
threads |
Number of live threads |
jvm.threads.deadlocks |
threads |
JVM Thread Deadlocks |
jvm.threads.daemon.count |
threads |
Number of live daemon threads |
jvm.uptime |
seconds |
Number of seconds the JVM process has been running |
jvm.file.descriptor.usage |
percentage |
Percentage of available file descriptors currently in use. |
jvm.gc.G1-Concurrent-GC.runs |
runs |
Total number of times that the G1 Concurrent Garbage Collection has run |
jvm.gc.G1-Concurrent-GC.time |
milliseconds |
Total amount of time that the G1 Concurrent Garbage Collection has been running |
jvm.gc.G1-Young-Generation.runs |
runs |
Total number of times that the G1 Young Generation has run |
jvm.gc.G1-Young-Generation.time |
milliseconds |
Total amount of time that the G1 Young Generation has been running |
jvm.gc.G1-Old-Generation.runs |
runs |
Total number of times that the G1 Old Generation has run |
jvm.gc.G1-Old-Generation.time |
milliseconds |
Total amount of time that the G1 Old Generation has been running |
JVM Record Attributes¶
JVM metrics do not provide Record Attributes.
CPU Metrics¶
Metric Name |
Unit |
Description |
---|---|---|
cores.available |
cores |
The number of available cores for the Runtime |
cores.load |
percentage |
Either the system load average or -1 if it is not available |
CPU Record Attributes¶
Attribute |
Description |
---|---|
id |
The fixed value |
name |
The name of the operating system |
architecture |
The architecture of the operating system |
version |
The version of the operating system |
Storage Metrics¶
Metric Name |
Unit |
Description |
---|---|---|
storage.free |
bytes |
The amount of free storage for a given repository |
storage.used |
bytes |
The amount of used storage for a given repository |
Storage Record Attributes¶
Attribute |
Description |
---|---|
id |
The unique identifier of the storage repository |
name |
Same as id and provided for consistency |
storage.type |
One of |
Example Queries¶
The following queries are examples to get you started with Openflow Telemetry.
All queries assume that Openflow is configured to send telemetry to the default Event Table of SNOWFLAKE.TELEMETRY.EVENTS
. If your Snowflake Account or Openflow Deployment is configured with a different Event Table, substitute that table name where you see SNOWFLAKE.TELEMETRY.EVENTS
.
Find Stuck FlowFiles¶
This query returns connections with FlowFiles that have been queued for more than some threshold, indicating that they may be stuck and require intervention. Adjust the 30 minute threshold as needed for your use case.
SELECT * FROM (
SELECT
resource_attributes:"openflow.dataplane.id" as Deployment_ID,
resource_attributes:"k8s.namespace.name" as Runtime_Key,
record_attributes:name as Connection_Name,
record_attributes:id as Connection_ID,
MAX(TO_NUMBER(value / 60 / 1000)) as Max_Queued_File_Minutes
FROM snowflake.telemetry.events
WHERE true
AND record_type = 'METRIC'
AND record:metric:name = 'connection.queued.duration.max'
AND timestamp > dateadd(minutes, -30, sysdate())
GROUP BY 1, 2, 3, 4
ORDER BY Max_Queued_File_Minutes DESC
) WHERE Max_Queued_File_Minutes > 30;
Find Error Logs for Openflow Runtimes¶
SELECT
timestamp,
Deployment_ID,
Runtime_Key,
parsed_log:level as log_level,
parsed_log:loggerName as logger,
parsed_log:formattedMessage as message,
parsed_log
FROM (
SELECT
timestamp,
resource_attributes:"openflow.dataplane.id" as Deployment_ID,
resource_attributes:"k8s.namespace.name" as Runtime_Key,
TRY_PARSE_JSON(value) as parsed_log
FROM snowflake.telemetry.events
WHERE true
AND timestamp > dateadd('minutes', -30, sysdate())
AND record_type = 'LOG'
AND resource_attributes:"k8s.namespace.name" like 'runtime-%'
ORDER BY timestamp DESC
) WHERE log_level = 'ERROR';
Find Running and Non-Running Processors¶
Some flows expect that all processors are in a “running” state, even if they are not actively processing data.
This query helps you find any processors that are running or in another state, such as: - stopped - invalid - disabled
SELECT
timestamp,
resource_attributes:"openflow.dataplane.id" as Deployment_ID,
resource_attributes:"k8s.namespace.name" as Runtime_Key,
record_attributes:component as Processor,
record_attributes:id as Processor_ID,
TO_NUMBER(value) as Running
FROM snowflake.telemetry.events
WHERE true
AND record:metric:name = 'processor.run.status.running'
AND record_type = 'METRIC'
AND timestamp > dateadd(minutes, -30, sysdate());
Find High CPU Usage for Openflow Runtimes¶
Slow data flows or reduced throughput may be the result of a bottleneck on the CPU. Openflow Runtimes scale up automatically, based on the number of minimum and maximum nodes you have configured.
If an Openflow Runtime is using its maximum number of nodes and still CPU usage remains high, consider: #. Increasing the maximum number of nodes allocated to the Runtime #. Troubleshoot the Connector or flow to identify the bottleneck
Snowsight Charts provide an easy way to visualize query results for CPU usage over time.
SELECT
timestamp,
resource_attributes:"openflow.dataplane.id" as Deployment_ID,
resource_attributes:"k8s.namespace.name" as Runtime_Key,
resource_attributes:"k8s.pod.name" as Runtime_Pod,
TO_NUMBER(value, 10, 3) * 100 as CPU_Usage_Percentage
FROM snowflake.telemetry.events
WHERE true
AND timestamp > dateadd(minute, -30, sysdate())
AND record_type = 'METRIC'
AND record:metric:name ilike 'container.cpu.usage'
AND resource_attributes:"k8s.namespace.name" ilike 'runtime-%'
AND resource_attributes:"k8s.container.name" ilike '%-server'
ORDER BY timestamp desc, CPU_Usage_Percentage desc;