高性能アーキテクチャのSnowpipe Streamingのベストプラクティス

このガイドでは、 高性能アーキテクチャを持つSnowpipe Streamingを使用して、堅牢なデータインジェスチョンパイプラインを設計および実装するための主要なベストプラクティスについて説明します。これらのベストプラクティスに従うことで、耐久性と信頼性に優れたパイプラインになり、効率的なエラー処理が可能になります。

チャネルの戦略的管理

パフォーマンスと長期的な安定性のために、次のチャネル管理戦略を適用します。

  • 存続期間の長いチャネルを使用する:オーバーヘッドを最小限に抑えるには、一度チャネルを開き、インジェスチョンタスク中アクティブに保ちます。繰り返しチャネルを開いたり閉じたりしないでください。

  • 決定論的チャネル名を使用する:トラブルシューティングを簡素化し、自動復旧プロセスを促進するために、一貫性のある予測可能な命名規則を適用します。たとえば source-env-region-client-id が挙げられます。

  • 複数のチャネルでスケールアウトする:スループットを増やすには、複数のチャネルを開きます。これらのチャネルは、サービス制限とスループット要件に応じて、単一のターゲットパイプをポイントすることも、複数のパイプをポイントすることもできます。

  • チャネルステータスを監視する:getChannelStatus メソッドを定期的に使用して、インジェスチョンチャネルの健全性を監視します。

    • last_committed_offset_token を追跡して、データが正常にインジェストされ、パイプラインが進行中であることを確認します。

    • row_error_count を監視して、不正な記録やその他のインジェスチョンの問題を早期に検出します。

スキーマを一貫して検証する

インジェスチョンの失敗を防ぎ、データ統合を維持するために、受信データが予想されるテーブルスキーマに準拠していることを確認します。

  • クライアント側の検証:クライアント側にスキーマ検証を実装して、即座にフィードバックを提供し、サーバー側のエラーを削減します。完全な行ごとの検証は最大の安全性を提供しますが、より優れたパフォーマンスを発揮するメソッドには、選択的検証が含まれる可能性があります。たとえば、バッチの境界またはサンプリング行などです。

  • サーバー側の検証:パフォーマンスに優れたアーキテクチャは、スキーマ検証をサーバーにオフロードできます。ターゲットパイプとターゲットテーブルへのインジェスチョン中にスキーマの不一致が発生した場合、エラーとその数は:code:`getChannelStatus`を通じて報告されます。

クライアント側のメタデータ列を追加する

堅牢なエラー検出と復旧を実現するには、行ペイロードの一部としてインジェスチョンメタデータを送信する必要があります。これには、データ形状と PIPE 定義を事前に計画する必要があります。

インジェスチョンの前に、行ペイロードに次の列を追加します。

  • CHANNEL_ID;(たとえば、コンパクトINTEGER)

  • STREAM_OFFSET;(Kafkaパーティションオフセットなど、チャネルごとに単調に増加する BIGINT。)

また、これらの列はチャネルごとに記録を一意に識別し、データの基点をトレースできるようにします。

複数のパイプが同じターゲットテーブルにインジェストする場合は、任意で:code:`PIPE_ID`列を追加します。この列を使用すると、行をインジェスチョンパイプラインまでトレースできます。説明的なパイプ名を別のルックアップテーブルに格納し、それらをコンパクト整数にマッピングしてストレージコストを削減することができます。

メタデータオフセットを使用したエラーの検出と復旧

チャネルモニタリングとメタデータ列を組み合わせて、問題を検出して復旧します。

  • ステータスを監視する::code:`getChannelStatus`を定期的に確認してください。:code:`row_error_count`が増加している場合は、何らかの問題が発生している可能性が高いです。

  • **欠落しているレコードを検出する: エラーが検出された場合は、SQLクエリを使用して:code:`STREAM_OFFSET`シーケンスのギャップをチェックし、欠落している、または順序が乱れたレコードを特定します。

SELECT
  PIPE_ID,
  CHANNEL_ID,
  STREAM_OFFSET,
  LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) AS previous_offset,
  (LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) + 1) AS expected_next
FROM my_table
QUALIFY STREAM_OFFSET != previous_offset + 1;
Copy

REST API リクエストに対する圧縮の使用

Snowpipe Streaming RESTAPI を使用する場合は、圧縮を使用してリクエストごとにより多くのデータを送信し、ネットワークのオーバーヘッドを削減します。

REST API にはリクエストあたり4 MB の物理的な上限があるにもかかわらず、この上限は監視対象の転送サイズに適用されます。圧縮を使用すると、より大きな非圧縮データ量を各リクエストに適合させることができ、より高いスループットを可能にし、必要とする API 呼び出しの数を削減することができます。

Gzipもサポートされていますが、Snowflakeはパフォーマンスに優れた圧縮アルゴリズムとしてZSTDの使用を推奨します。

MATCH_BY_COLUMN_NAME でインジェスチョンのパフォーマンスとコストを最適化する

すべてのデータを単一の VARIANT 列にインジェストする代わりに、ソースデータから必要な列をマップするようにパイプを構成します。これを実行するには、MATCH_BY_COLUMN_NAME = CASE_SENSITIVE を使用するか、パイプ定義で変換を適用します。このベストプラクティスは、インジェスチョンコストを最適化するだけでなく、ストリーミングデータパイプラインの全体的なパフォーマンスを向上させます。

このベストプラクティスには、次の重要な利点があります。

  • MATCH_BY_COLUMN_NAME = CASE_SENSITIVE を使用することで、ターゲットテーブルにインジェストされたデータ値に対してのみ請求されます。反対に、単一の VARIANT 列にデータをインジェストすると、キーと値両方を含むすべての JSON バイトに対して請求されます。冗長または多数の JSON キーを持つデータの場合、これによりインジェスチョンコストが大幅かつ不必要に増加する可能性があります。

  • Snowflakeの処理エンジンは、より計算効率に優れています。JSON オブジェクト全体を VARIANT に解析してから必要な列を抽出する代わりに、このメソッドは必要な値を直接抽出します。

半構造化データに対するネイティブデータ型の使用

最適なパフォーマンスとデータの整合性を保つために、シリアル化された文字列ではなくネイティブ言語オブジェクトを使用して半構造化データを提供します。

  • パフォーマンス:ネイティブオブジェクトの場合、SDK はSnowflakeサーバーで追加の解析ステップを必要とせずに、より効率的にデータを処理できます。

  • タイプセーフティ:高性能アーキテクチャは、文字列リテラルをリテラルテキストとして扱います。ネイティブオブジェクトを使用することで、データをエスケープされた文字列値ではなく構造化された JSON として保存するようにできます。

Javaの例:

// Preferred: SDK converts the List to a structured ARRAY
row.put("tags", Arrays.asList("electronics", "sale"));
Copy

Pythonの例:

# Preferred: SDK converts the dict to a structured VARIANT
row["payload"] = {"event_id": 101, "status": "active"}
Copy

Prometheusメトリックを取得する

Snowpipe Streaming高性能クライアントからパフォーマンスメトリックを取得するには、組み込みのPrometheusメトリックサーバーを有効にし、エンドポイントをスクレイピングするようにPrometheusサービスを構成する必要があります。

アプリケーションを実行する前に、環境変数 SS_ENABLE_METRICStrue に設定してメトリックサーバーを有効にします。

Snowpipe Streamingインジェストプロセスを実行しているホスト上のメトリックエンドポイントをスクレイピングします。デフォルトのパスは、 SS_METRICS_IP および SS_METRICS_PORT によって定義されたホストとポート上の /metrics です。

例:メトリックのエンドポイント(ローカルプロセス/開発ボックス)の確認

# Enable Prometheus metrics
export SS_ENABLE_METRICS=true
# Run your application (the server starts on 127.0.0.1:50000 by default)

# Curl the endpoint to verify the metrics are exposed
curl http://127.0.0.1:50000/metrics
Copy

例:Prometheusスクレイピング設定

Snowpipe Streamingクライアントを実行しているホストにPrometheusサービスをポイントします。

scrape_configs:
  - job_name: snowpipe_streaming_hp
    metrics_path: /metrics   # default is /metrics
    static_configs:
      - targets: ['127.0.0.1:50000']
Copy

回復力を重視した設計

インジェスチョンをtry-catchブロックで囲む

:code:`insertRows`が常に成功するとは限りません。インジェスチョンループが:code:`SFException`をキャッチし、HTTPステータスコードを解釈できることを確認してください。特に、無効化を示す 409 や、スロットリングを示す 429 に注意が必要です。

エクスポネンシャル・バックオフを実装

リトライ可能なエラー(429、500、503)が発生した場合、すぐに再試行しないようにします。再試行ごとに待ち時間を段階的に増やしていくエクスポネンシャル・バックオフ戦略を用い、システムが回復する時間を与えてください。

オフセットトークンを使用して進捗を確認

定期的に:code:`getLatestCommittedOffsetToken`を呼び出し、どのデータが正常に保存されたかを追跡します。409エラーが発生した場合は、チャネルを再オープンした後にこのトークンを使用して、データの再送を開始すべき正確な箇所を特定します。

チャネルステータスを監視

定期的に getChannelStatus() をチェックします。ステータスコードが:code:`SUCCESS`以外であれば、チャネルやクライアント接続をリセットするなどのエラーハンドリングロジックを実行します。