オープンフローの監視

このトピックでは、Openflow の状態を監視し、問題をトラブルシューティングする方法について説明します。

Openflow ログへのアクセス

Snowflake は、 Openflowのセットアップ 時に構成したイベントテーブルに Openflow ログを送信します。Snowflakeでは、イベントテーブルクエリの WHERE 句にタイムスタンプを含めることを推奨しています。さまざまなSnowflakeコンポーネントによって生成される潜在的データ量のため、これは特に重要です。フィルターを適用すると、より小さなデータのサブセットを取得することができ、クエリのパフォーマンスが向上します。

イベントテーブルには以下の列があり、SnowflakeがOpenflowから収集したログに関する有用な情報を提供します。

  • TIMESTAMP: Snowflakeがいつログを収集したかを示します。

  • RESOURCE_ATTRIBUTES: ログメッセージを生成した Snowflake サービスを識別する JSON オブジェクトを提供します。例えば、Openflow のアプリケーションやデータプレーン ID などのデータを提供します。

    {
    "application": "openflow",
    "cloud.service.provider": "aws",
    "k8s.container.name": "pg-dev-server",
    "k8s.container.restart_count": "0",
    "k8s.namespace.name": "runtime-pg-dev",
    "k8s.node.name": "ip-10-10-62-36.us-east-2.compute.internal",
    "k8s.pod.name": "pg-dev-0",
    "k8s.pod.start_time": "2025-04-25T22:14:29Z",
    "k8s.pod.uid": "94610175-1685-4c8f-b0a1-42898d1058e6",
    "k8s.statefulset.name": "pg-dev",
    "openflow.dataplane.id": "abeddb4f-95ae-45aa-95b1-b4752f30c64a"
    }
    
    Copy
  • RECORD_ATTRIBUTES: Snowflakeサービスの場合、エラーソース(標準出力または標準エラー)を識別子で指定します。

    {
    "log.file.path": "/var/log/pods/runtime-pg-dev_pg-dev-0_94610175-1685-4c8f-b0a1-42898d1058e6/pg-dev-server/0.log",
    "log.iostream": "stdout",
    "logtag": "F"
    }
    
    Copy
  • VALUE: 標準出力と標準エラーは行に分割され、各行ごとにイベント・テーブルの記録が生成されます。

    "{\"timestamp\":1746655642080,\"nanoseconds\":80591397,\"level\":\"INFO\",\"threadName\":\"Clustering Tasks Thread-2\",\"loggerName\":\"org.apache.nifi.controller.cluster.ClusterProtocolHeartbeater\",\"formattedMessage\":\"Heartbeat created at 2025-05-07T22:07:22.071Z and sent to pg-dev-0.pg-dev.runtime-pg-dev.svc.cluster.local:8445 at 2025-05-07T22:07:22.080590784Z; determining Cluster Coordinator took 7 millis; DNS lookup for coordinator took 0 millis; connecting to coordinator took 0 millis; sending heartbeat took 1 millis; receiving first byte from response took 1 millis; receiving full response took 1 millis; total time was 9 millis\",\"throwable\":null}"
    

ランタイムのエラーレベルログの検索

SELECT
    timestamp,
    resource_attributes:"k8s.namespace.name" AS runtime_key,
    parse_json(value::string):loggerName AS logger,
    parse_json(value::string):formattedMessage AS log_value
FROM openflow.telemetry.EVENTS_<account-id>
WHERE true
AND timestamp < dateadd('days', -1, sysdate())
AND record_type = 'LOG'
AND resource_attributes:"k8s.namespace.name" LIKE 'runtime-%'
AND resource_attributes:"k8s.container.name" LIKE '%-server'
AND parse_json(value::string):level = 'ERROR'
ORDER BY timestamp desc
LIMIT 5;
Copy

ログから "causeed by "例外を検索

これらの例外は、断続的な接続の問題、データの非互換性、または関連する原因によって発生する可能性があります。

SELECT
    timestamp,
    RESOURCE_ATTRIBUTES:"k8s.namespace.name" AS Namespace,
    RESOURCE_ATTRIBUTES:"k8s.pod.name" AS Pod,
    RESOURCE_ATTRIBUTES:"k8s.container.name" AS Container,
    value
FROM openflow.telemetry.EVENTS_<account-id>
WHERE true
AND record_type = 'LOG'
AND timestamp > dateadd(minute, -5, sysdate())
AND value LIKE '%Caused By%'
ORDER BY timestamp desc
LIMIT 10;
Copy

実行中、停止中、またはその他の状態にあるプロセッサーの検索

SELECT
    timestamp,
    RECORD_ATTRIBUTES:component AS Processor,
    RECORD_ATTRIBUTES:id AS Processor_ID,
    value AS Running
FROM openflow.telemetry.EVENTS_<account-id>
WHERE true
AND RECORD:metric:name = 'processor.run.status.running'
AND RECORD_TYPE='METRIC'
AND timestamp > dateadd(minute, -5, sysdate());
Copy

ランタイムの CPU 使用率が高いものを検索

遅いデータフローやスループットの低下は、 CPU のボトルネックが原因かもしれません。ランタイムは、構成した最小ノード数と最大ノード数に基づいて、自動的にスケールアップする必要があります。ランタイムが最大ノード数を使用しているにもかかわらず、 CPU 使用率が高い場合は、ランタイムに割り当てる最大ノード数を増やすか、コネクターまたはフローのトラブルシューティングを行ってください。

SELECT
    timestamp,
    RESOURCE_ATTRIBUTES:"k8s.namespace.name" AS Namespace,
    RESOURCE_ATTRIBUTES:"k8s.pod.name" AS Pod,
    RESOURCE_ATTRIBUTES:"k8s.container.name" AS Container,
    value AS CPU_Usage
FROM openflow.telemetry.EVENTS_<account-id>
WHERE TIMESTAMP > dateadd(minute, -1, sysdate())
AND RECORD_TYPE = 'METRIC'
AND RECORD:metric:name ilike 'container.cpu.usage'
AND RESOURCE_ATTRIBUTES:"k8s.namespace.name" ilike 'runtime%'
AND RESOURCE_ATTRIBUTES:"k8s.container.name" ilike '%server'
AND RESOURCE_ATTRIBUTES:"k8s.namespace.name" NOT IN ('runtime-infra', 'runtime-operator')
ORDER BY TIMESTAMP desc, CPU_Usage desc;
Copy

利用可能なメトリクス

ランタイムで利用可能なメトリクス

以下は、ランタイムの利用可能なメトリクスのリストです。

メトリック

単位

説明

cores.load

percentage

gauge

ランタイムで利用可能な全コアの平均負荷。すべての利用可能なコアが完全に使用されている場合、最大値は1です。

cores.available

CPU コア

gauge

ランタイムが利用可能な CPU コア数

storage.free

bytes

gauge

ランタイムがストレージタイプごとに使用できる空きストレージの量。ストレージタイプは3種類あります。

  • flowfile

  • content

  • provenance

RECORD_ATTRIBUTES 列で storage.type メトリクスを表示できます。

storage.used

bytes

gauge

ストレージタイプごとの使用ストレージ量。ストレージタイプは3種類あります。

  • flowfile

  • content

  • provenance

RECORD_ATTRIBUTES 列で storage.type メトリクスを表示できます。

CPU メトリクスのサンプル・クエリ

SELECT * from events
WHERE
1 = 1
AND record_type = 'METRIC'
AND resource_attributes:application = 'openflow'
AND record:metric.name IN ('cores.load', 'cores.available')
ORDER BY timestamp desc
LIMIT 1000;
Copy

コネクターで使用可能なメトリクス

以下は、コネクターで利用可能なメトリクスのリストです。

メトリック

単位

説明

processgroup.bytes.received

bytes

gauge

ソースからの平均消費バイト数

processgroup.bytes.sent

bytes

gauge

宛先に書き込まれた平均バイト数

イベントテーブルからこれらのメトリクスをクエリするには、Openflow ランタイムキャンバスからプロセスグループ名と ID を見つけ、 RECORD_ATTRIBUTES 列からフィルターする必要があります。