高性能アーキテクチャを備えたSnowpipe Streamingでのエラーログ

Snowpipe Streamingのエラーログは、Snowflakeの DML エラーログ 機能を活用し、データ取り込みエラーを管理および復旧するための堅牢な方法を提供します。この機能により、データが気付かれないうちに失われるのを防ぎ、不具合のあるデータ行を明確に把握できるようになります。エラーログがオンになっている場合、エラーのないデータはターゲットテーブルに引き続きロードされますが、処理に失敗した行は、レビューと回復のために専用のエラーテーブルに自動的にルーティングされます。

重要

エラーテーブルに保存されるデータは、パイプ変換が適用される前の、 API または SDK に送信された元のペイロードです。パイプがフィールドをドロップまたは変換した場合でも、元のペイロード全体はエラーテーブルに永続化されます。

概要

Snowpipe Streaming高性能アーキテクチャを使用すると、データ処理はSnowflakeのサーバー側で行われます。高性能アーキテクチャは ON_ERROR = CONTINUE モードで暗黙的に動作するため、有効な行は取り込まれる一方、問題のある行はスキップされます。

エラー処理のオプション

次の方法で取り込みエラーをモニターおよび処理できます。

エラーテーブルなし:

  • getChannelStatus() を使用して、集計されたエラーカウント、最後のエラーメッセージ、最後のエラータイムスタンプをモニターします。

  • SNOWPIPE_STREAMING_CHANNEL_HISTORY ビューをクエリして、過去のエラーの傾向とパターンを確認します。

これらのメソッドでは、エラーが発生した こと と、いくつ エラーが発生したかをお知らせしますが、どの行 が失敗したかや、そのペイロードはお知らせしません。

エラーテーブルあり:

  • 処理に失敗した行は、専用のエラーテーブルに自動的に取り込まれます。

  • 各エラー行には、完全な元のペイロードと詳細なエラーメタデータが含まれます。

  • 標準の SQL を使用して、失敗した行をクエリ、分析、再処理できます。

エラーテーブルは、失敗した行とその理由を正確に表示することで全体像を完成させ、完全なデバッグと復旧を可能にします。

エラーログをオンにする

Snowpipe Streamingのエラーログをオンにするには、ターゲットテーブルの ERROR_LOGGING プロパティを設定します。エラーログの有効化および構成の詳細については、 テーブルの DML エラーログの構成 をご参照ください。

-- For a new table:
CREATE TABLE my_streaming_table (...) ERROR_LOGGING = TRUE;

-- For an existing table:
ALTER TABLE my_streaming_table SET ERROR_LOGGING = TRUE;

エラーログがオンの場合、同じエラーテーブルに DML ステートメントとSnowpipe Streaming取り込みワークロードの両方からのエラーがキャプチャされます。

エラーテーブルのクエリ

ベーステーブルのエラーテーブルをクエリするには、 ERROR_TABLE テーブル関数を使用します。エラーテーブルスキーマ、アクセス制御、およびサポートされている操作の詳細については、 エラーログとエラーテーブル をご参照ください。

SELECT * FROM ERROR_TABLE(my_streaming_table) ORDER BY timestamp;

結果には、取り込みストリーム内のすべてのエラー行の行が含まれます。

Snowpipe Streamingエラーフィールド

Snowpipe Streamingエラーは DML エラー( timestampquery_iderror_codeerror_metadataerror_data )として同じ エラーテーブル列 に保存されます。error_metadata および error_data オブジェクトには、以下のセクションで説明するSnowpipe Streamingのための追加フィールドが含まれています。

Snowpipe Streamingエラーの特定

Snowpipe Streamingからのエラーの場合は、 error_metadata:service フィールドには snowpipe_streaming が入力されます。このフィールドを使用して、ソースでエラーをフィルターします。

SELECT * FROM ERROR_TABLE(my_streaming_table)
WHERE error_metadata:service = 'snowpipe_streaming';

エラーメタデータの詳細

Snowpipe Streamingエラーの場合、 error_metadata:details オブジェクトには、次の追加フィールドが含まれます。

フィールド

説明

pipe_name

エラー行の取り込みに使用されるパイプの名前。

channel_name

エラー行の取り込みに使用されるチャネルの名前。

offset_token_upper_bound

エラー行を含む上限オフセットトークン。行は、このオフセットトークン以前のペイロードに表示されます。

error_data_truncated

生ペイロードがエラーテーブルに収まるように切り捨てられたかどうかを示します(最大128 MB )。

error_data_content_type

error_data 列に保存されているコンテンツの型を示します。エラーのデータコンテンツ型 をご参照ください。

エラーデータ形式

Snowpipe Streamingエラーの場合、 error_data:$1 フィールドには、エラー行を表す生ペイロードが含まれます。

ペイロードに無効な UTF-8文字が含まれている場合、生ペイロードはbase64エンコードのバイナリ文字列として保存されます。

エラーのデータコンテンツ型

error_data_content_type フィールドは、発生したエラーの型を示し、修正ステップを提案します。

json

エラー行は構文的に有効な JSON 文字列ですが、ターゲットテーブルへのデータの取り込み中に論理的エラーが発生しました。

一般的な論理的エラーは次のとおりです。

  • NULL不可の列がありません :NOT NULL 制約が設定されている必須列がペイロードで提供されませんでした。

  • 型変換エラー :JSON データ型はターゲット列型にキャストできません。たとえば、文字列値 "abc" は NUMBER 列に変換できません。

  • 変換エラー :ゼロ除算などのパイプ変換式を評価中にエラーが発生しました。

解決するには、取り込みエラーの原因となった error_metadata:error_message のエラーメッセージと error_metadata:error_source の列名を確認してください。PARSE_JSON(error_data:$1) でペイロードを解析し、データを修正し、ターゲットテーブルに再挿入します。

json-invalid

構文的に無効な JSON オブジェクトが取り込まれました。

解決するには、構文エラーに関する詳細が含まれている error_metadata:error_message のエラーメッセージを確認してください。error_data:$1 に保管されたペイロードを修正し、ターゲットテーブルに再挿入します。

binary-base64

無効な UTF-8データが取り込まれました。エラーペイロードは、base64でエンコードされたバイナリ文字列としてエラーテーブルに保存されます。

このエラー型は通常、上流データソースの形式の不一致またはエンコードエラーを示します。

解決するには、データソースと、それが生成するデータ形式とエンコードを調べます。error_data:$1 に保存されているペイロードを BASE64_DECODE_STRING 関数でデコードして、生のバイトを検査し、不正な UTF-8シーケンスを特定します。

エラー復旧ワークフロー

次の例は、エラーをクエリし、それらを分析し、修正されたデータを再挿入する方法を示しています。

最近のエラーのクエリ

SELECT
    timestamp,
    error_code,
    error_metadata:error_message::STRING AS error_message,
    error_metadata:details:channel_name::STRING AS channel,
    error_metadata:details:pipe_name::STRING AS pipe,
    error_metadata:details:error_data_content_type::STRING AS content_type,
    error_data:"$1"::STRING AS raw_payload
FROM ERROR_TABLE(my_streaming_table)
WHERE error_metadata:service = 'snowpipe_streaming'
  AND timestamp >= DATEADD(hour, -1, CURRENT_TIMESTAMP())
ORDER BY timestamp DESC;

エラー分布の分析

SELECT
    error_code,
    error_metadata:error_message::STRING AS error_message,
    COUNT(*) AS error_count
FROM ERROR_TABLE(my_streaming_table)
WHERE error_metadata:service = 'snowpipe_streaming'
  AND timestamp >= DATEADD(hour, -24, CURRENT_TIMESTAMP())
GROUP BY 1, 2
ORDER BY error_count DESC;

回復可能なエラーの修正と再挿入

有効な JSON ペイロードのエラーの場合、データの解析、修正、再挿入ができます。

INSERT INTO my_streaming_table (col1, col2, col3)
SELECT
    TRY_CAST(PARSE_JSON(error_data:"$1"):col1 AS NUMBER),
    PARSE_JSON(error_data:"$1"):col2::STRING,
    TRY_CAST(PARSE_JSON(error_data:"$1"):col3 AS TIMESTAMP)
FROM ERROR_TABLE(my_streaming_table)
WHERE error_metadata:service = 'snowpipe_streaming'
  AND error_metadata:details:error_data_content_type = 'json'
  AND timestamp >= DATEADD(hour, -24, CURRENT_TIMESTAMP());

エラーの再処理が成功すると、エラーテーブルを切り捨てることができます。

TRUNCATE ERROR_TABLE(my_streaming_table);

請求

Snowpipe Streaming取り込みは、標準のSnowpipe Streaming料金で請求されます。エラーログをオンにしても、取り込みコストは変わりません。失敗した行をエラーテーブルにルーティングするための追加の取り込み料金はありません。

Snowflakeは、他のテーブルと同じように、標準ストレージレートでエラーテーブルに保存されたデータに対して料金を請求します。エラーテーブルには、失敗した各行の生ペイロードとエラーメタデータが保存されます。

Snowpipe Streamingコストの詳細については、 Snowpipe Streaming high-performance architecture: Understand your costs をご参照ください。

制限事項

  • エラーテーブルは、サーバー側のデータ処理(解析および変換)中に発生したエラーをキャプチャします。他のステージからのエラー( SDK 検証、 API 失敗やその他のサーバー側の非同期エラー)は、エラーテーブルにキャプチャされません。getChannelStatus() を使用して、サーバー側の非同期エラーをモニターします。

  • 入ってくる行の失敗率が高いと、エラー情報を保存するオーバーヘッドのために処理のレイテンシが増加する可能性があります。

  • 128 MB を超えるペイロードは切り捨てられます。error_data_truncated フィールドは、切り捨てが発生したタイミングを示します。

  • エラーテーブルは、Snowpipe Streaming高性能アーキテクチャでのみ使用可能です。従来のアーキテクチャでは、エラー処理は SDK を通じてクライアント側で管理されます。