高性能アーキテクチャのSnowpipe Streamingのベストプラクティス

このガイドでは、 高性能アーキテクチャを持つSnowpipe Streamingを使用して、堅牢なデータインジェスチョンパイプラインを設計および実装するための主要なベストプラクティスについて説明します。これらのベストプラクティスに従うことで、耐久性と信頼性に優れたパイプラインになり、効率的なエラー処理が可能になります。

チャネルの戦略的管理

パフォーマンスと長期的な安定性のために、次のチャネル管理戦略を適用します。

  • 存続期間の長いチャネルを使用する:オーバーヘッドを最小限に抑えるには、一度チャネルを開き、インジェスチョンタスク中アクティブに保ちます。繰り返しチャネルを開いたり閉じたりしないでください。

  • 決定論的チャネル名を使用する:トラブルシューティングを簡素化し、自動復旧プロセスを促進するために、一貫性のある予測可能な命名規則を適用します。たとえば source-env-region-client-id が挙げられます。

  • 複数のチャネルでスケールアウトする:スループットを増やすには、複数のチャネルを開きます。これらのチャネルは、サービス制限とスループット要件に応じて、単一のターゲットパイプをポイントすることも、複数のパイプをポイントすることもできます。

  • チャネルステータスを監視する:getChannelStatus メソッドを定期的に使用して、インジェスチョンチャネルの健全性を監視します。

    • last_committed_offset_token を追跡して、データが正常にインジェストされ、パイプラインが進行中であることを確認します。

    • row_error_count を監視して、不正な記録やその他のインジェスチョンの問題を早期に検出します。

スキーマを一貫して検証する

インジェスチョンの失敗を防ぎ、データ統合を維持するために、受信データが予想されるテーブルスキーマに準拠していることを確認します。

  • クライアント側の検証:クライアント側にスキーマ検証を実装して、即座にフィードバックを提供し、サーバー側のエラーを削減します。完全な行ごとの検証は最大の安全性を提供しますが、より優れたパフォーマンスを発揮するメソッドには、選択的検証が含まれる可能性があります。たとえば、バッチの境界またはサンプリング行などです。

  • サーバー側の検証:高性能アーキテクチャは、スキーマの検証をサーバーにオフロードできます。ターゲットパイプとテーブルへのインジェスチョン中にスキーマの不一致が発生した場合、エラーとその数は getChannelStatus を通じて報告されます。

信頼できる復旧のための状態を永続化する

データの損失や重複を防ぐには、アプリケーションはその状態を維持して、再起動や障害を正常に処理する必要があります。

  • オフセットトークンを永続化する:各 API 呼び出しの成功後に、last_committed_offset_token を耐久性のあるストレージへ永続化します。

  • 最後のポイントから再開する:アプリケーションの再起動時に、最後にコミットされたトークンをSnowflakeから取得し、その正確なポイントからインジェスチョンを再開します。これにより、1回の処理が保証され、継続性が保証されます。

クライアント側のメタデータ列を追加する

堅牢なエラー検出と復旧を実現するには、行ペイロードの一部としてインジェスチョンメタデータを送信する必要があります。これには、データ形状と PIPE 定義を事前に計画する必要があります。

インジェスチョンの前に、行ペイロードに次の列を追加します。

  • ``CHANNEL_ID``(たとえば、コンパクト INTEGER)

  • STREAM_OFFSET``(Kafkaパーティションオフセットなど、チャネルごとに単調に増加する ``BIGINT。)

また、これらの列はチャネルごとに記録を一意に識別し、データの基点をトレースできるようにします。

オプション:複数のパイプが同じターゲットテーブルにインジェストする場合は、PIPE_ID 列を追加します。これを行うと、行をインジェスチョンパイプラインまで簡単に追跡できます。説明的なパイプ名を別のルックアップテーブルに格納し、それらをコンパクト整数にマッピングしてストレージコストを削減することができます。

メタデータオフセットを使用したエラーの検出と復旧

チャネルモニタリングとメタデータ列を組み合わせて、問題を検出して復旧します。

  • ステータスを監視する:定期的に getChannelStatus をチェックします。増加する row_error_count は、潜在的な問題の強力なインジケーターです。

  • 欠落した記録を検出する:エラーが検出された場合は、SQL クエリを使用して STREAM_OFFSET シーケンスのギャップをチェックし、欠落している、またはばらばらの記録を特定します。

SELECT
  PIPE_ID,
  CHANNEL_ID,
  STREAM_OFFSET,
  LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) AS previous_offset,
  (LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) + 1) AS expected_next
FROM my_table
QUALIFY STREAM_OFFSET != previous_offset + 1;
Copy

MATCH_BY_COLUMN_NAME でインジェスチョンのパフォーマンスとコストを最適化する

すべてのデータを単一の VARIANT 列にインジェストする代わりに、ソースデータから必要な列をマップするようにパイプを構成します。これを実行するには、MATCH_BY_COLUMN_NAME = CASE_SENSITIVE を使用するか、パイプ定義で変換を適用します。このベストプラクティスは、インジェスチョンコストを最適化するだけでなく、ストリーミングデータパイプラインの全体的なパフォーマンスを向上させます。

このベストプラクティスには、次の重要な利点があります。

  • MATCH_BY_COLUMN_NAME = CASE_SENSITIVE を使用することで、ターゲットテーブルにインジェストされたデータ値に対してのみ請求されます。反対に、単一の VARIANT 列にデータをインジェストすると、キーと値両方を含むすべての JSON バイトに対して請求されます。冗長または多数の JSON キーを持つデータの場合、これによりインジェスチョンコストが大幅かつ不必要に増加する可能性があります。

  • Snowflakeの処理エンジンは、より計算効率に優れています。JSON オブジェクト全体を VARIANT に解析してから必要な列を抽出する代わりに、このメソッドは必要な値を直接抽出します。