Snowpipeのトラブルシューティング

このトピックでは、Snowpipeを使用してデータをロードする際の問題をトラブルシューティングするための体系的なアプローチについて説明します。

このトピックの内容:

Snowpipeの問題をトラブルシューティングする手順は、データファイルの読み込みに使用されるワークフローによって異なります。

クラウドストレージイベント通知を使用したデータの自動ロード

一般的なトラブルシューティングステップ

次のステップを実行して、ファイルの自動ロードを妨げるほとんどの問題の原因を特定します。

ステップ1: パイプステータスを確認する

パイプの現在のステータスを取得します。結果は JSON 形式で表示されます。詳細については、 SYSTEM$PIPE_STATUS をご参照ください。

以下の値を確認します。

lastReceivedMessageTimestamp

メッセージキューから受信した、最後のイベントメッセージのタイムスタンプを指定します。たとえば、メッセージに関連付けられたパスがパイプ定義のパスと一致しない場合、このメッセージは特定のパイプに適用されない場合があります。また、作成されたデータオブジェクトによってトリガーされたメッセージのみが自動インジェストパイプによって消費されます。

タイムスタンプが予想よりも早い場合、サービス構成か(つまり、Amazon SQS、Amazon SNS、またはAzure Event Grid)、サービス自体のいずれかに問題があることを示している場合があります。フィールドが空の場合、サービス構成設定を確認します。フィールドにタイムスタンプが含まれているが、予想よりも早い場合、サービス構成で設定が変更されたかどうかを確認します。

lastForwardedMessageTimestamp

パイプに転送された一致するパスを持つ、最後の「オブジェクトの作成」イベントメッセージのタイムスタンプを指定します。

イベントメッセージがメッセージキューから受信されてもパイプに転送されない場合、新しいデータファイルが作成されるBLOBストレージパスと、Snowflakeステージおよびパイプ定義で指定された結合パスとの間に不一致がある可能性があります。ステージおよびパイプ定義で指定されているパスを確認します。パイプ定義で指定されたパスは、ステージ定義のすべてのパスに追加されます。

ステップ2。テーブルの COPY 履歴を表示

イベントメッセージを受信して転送する場合は、ターゲットテーブルのロードアクティビティ履歴をクエリします。詳細については、 COPY_HISTORY をご参照ください。

STATUS 列は、特定のファイルセットがロードされたか、部分的にロードされたか、ロードに失敗したかを示します。 FIRST_ERROR_MESSAGE 列は、試行が部分的にロードされたか失敗した場合の理由を示します。

ファイルのセットに複数の問題がある場合、 FIRST_ERROR_MESSAGE 列は最初に発生したエラーのみを示します。ファイル内のすべてのエラーを表示するには、 VALIDATION_MODE コピーオプションを RETURN_ALL_ERRORS に設定して COPY INTO <テーブル> ステートメントを実行します。VALIDATION_MODE コピーオプションは、ロードされるデータを検証し、指定された検証オプションに基づいて結果を返すように COPY ステートメントに指示します。このコピーオプションが指定されている場合、データはロードされません。ステートメントで、Snowpipeを使用してロードしようとしたファイルのセットを参照します。コピーオプションの詳細については、 COPY INTO <テーブル> をご参照ください。

COPY_HISTORY 出力に予想されるファイルのセットが含まれていない場合は、以前の期間をクエリします。ファイルが以前のファイルの複製であった場合は、元のファイルをロードしようとしたときに、ロード履歴にアクティビティが記録されていた可能性があります。

ステップ3:データファイルを検証する

ロード操作でデータファイルのエラーが発生した場合、 COPY_HISTORY テーブル関数は各ファイルで最初に発生したエラーを説明します。データファイルを検証するには、 VALIDATE_PIPE_LOAD 関数をクエリします。

Microsoft Azure Data Lake Storage Gen2ストレージで生成されたファイルが読み込まれていません

現在、一部のサードパーティクライアントは ADLS Gen 2 REST API にある FlushWithClose を呼び出しません。このステップは、Snowpipeにファイルをロードするように通知するイベントをトリガーするために必要です。Snowpipeをトリガーしてこれらのファイルをロードするために、手動による REST API の呼び出しを試みてください。

close 引数を使用した Flush メソッドの詳細については、https://docs.microsoft.com/en-us/dotnet/api/azure.storage.files.datalake.datalakefileclient.flushをご参照ください。 close パラメーターの負荷に関する追加の REST API 参照情報については、https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/updateをご参照ください。

Amazon SNS トピックのサブスクリプションが削除された後に、Snowpipeによるファイルの読み込みが停止する

ユーザーが特定のAmazon Simple Notification Service(SNS)トピックを参照するパイプオブジェクトを初めて作成するとき、SnowflakeはSnowflakeが所有するAmazon Simple Queue Service(SQS)キューをトピックにサブスクライブします。AWS 管理者が SNS トピックへの SQS サブスクリプションを削除すると、トピックを参照するパイプはAmazon S3からイベントメッセージを受信しなくなります。

Snowflake SQS キューを SNS トピックに再度サブスクライブする

  1. Snowflakeサポート に連絡して、Snowflakeアカウントのメタデータから SNS トピックサブスクリプションを削除します。

  2. AWS 管理コンソールで SNS、またはお好みの AWS ツールを使用して、トピックを削除します。SNS トピックを再作成します。トピックに元のトピックと同じ名前を付けます。手順については、AWS ドキュメントをご参照ください。

  3. SNS トピックを参照する新しいパイプを作成します。手順については、 ステップ3:自動インジェストを有効にしたパイプを作成する をご参照ください。Snowflakeは、Snowflakeが所有する SQS キューを新しい SNS トピックにサブスクライブします。

SNSトピックサブスクリプションの削除前に機能していたすべてのパイプが、S3からのイベントメッセージの受信を再開するはずです。

このプロセスの背後にある理由の背景情報については、AWSナレッジベースの この記事 をご参照ください。

Snowpipe RESTエンドポイントの呼び出しによるデータのロード

ステップ1:認証の問題を確認する

Snowpipe REST エンドポイントは、 JSON Webトークン(JWT)でキーペアの認証を使用します。

Python/Javaインジェスト SDKs が JWT を生成します。REST API を直接呼び出す場合は、それらを生成する必要があります。リクエストで JWT トークンが提供されない場合、REST エンドポイントからエラー 400 が返されます。無効なトークンが提供されると、次のようなエラーが返されます。

snowflake.ingest.error.IngestResponseError: Http Error: 401, Vender Code: 390144, Message: JWT token is invalid.

ステップ2。テーブルの COPY 履歴を表示する

Snowpipeを使用して試行されたデータロードを含む、テーブルのロードアクティビティ履歴をクエリします。詳細については、 COPY_HISTORY をご参照ください。 STATUS 列は、特定のファイルセットがロードされたか、部分的にロードされたか、ロードに失敗したかを示します。 FIRST_ERROR_MESSAGE 列は、試行が部分的にロードされたか失敗した場合の理由を示します。

ファイルのセットに複数の問題がある場合、 FIRST_ERROR_MESSAGE 列は最初に発生したエラーのみを示します。ファイル内のすべてのエラーを表示するには、 VALIDATION_MODE コピーオプションを RETURN_ALL_ERRORS に設定して COPY INTO <テーブル> ステートメントを実行します。VALIDATION_MODE コピーオプションは、ロードされるデータを検証し、指定された検証オプションに基づいて結果を返すように COPY ステートメントに指示します。このコピーオプションが指定されている場合、データはロードされません。ステートメントで、Snowpipeを使用してロードしようとしたファイルのセットを参照します。コピーオプションの詳細については、 COPY INTO <テーブル> をご参照ください。

ステップ3:パイプステータスを確認する

COPY_HISTORY テーブル関数が調査中のデータロードに対して0の結果を返す場合、パイプの現在の状態を取得します。結果は JSON 形式で表示されます。詳細については、 SYSTEM$PIPE_STATUS をご参照ください。

executionState キーは、パイプの実行状態を識別します。たとえば、 PAUSED はパイプが現在一時停止していることを示します。パイプの所有者は、 ALTER PIPE を使用してパイプの実行を再開できます。

executionState 値がパイプの開始に関する問題を示している場合は、 error キーで詳細を確認してください。

ステップ4:データファイルを検証する

ロード操作でデータファイルのエラーが発生した場合、 COPY_HISTORY テーブル関数は各ファイルで最初に発生したエラーを説明します。データファイルを検証するには、 VALIDATE_PIPE_LOAD 関数をクエリします。

その他の問題

ファイルセットのアンロード

COPY_HISTORY 関数の出力が、ファイルのサブセットがロードされていないことを示している場合、パイプを「更新」しようとする場合があります。

この状況は、次のいずれかの状況で発生する可能性があります。

  • 以前、外部ステージは、 COPY INTO テーブル コマンドを使用してデータを一括ロードしていた。

  • REST API:

    • 外部イベントドリブン機能を使用して REST APIsを呼び出し、データファイルのバックログは、イベントが構成される前に外部ステージにすでに存在していました。

  • 自動取り込み:

    • イベント通知が構成される前の外部ステージには、データファイルのバックログがすでに存在していました。

    • イベント通知の失敗により、一連のファイルがキューに入れられませんでした。

構成されたパイプを使用して外部ステージにデータファイルをロードするには、 ALTER PIPE ... REFRESH ステートメントを実行します。

変更済みデータをリロードできない、変更済みデータが意図せずにロードされた

Snowflakeはファイルロードメタデータを使用して、テーブル内の同じファイル(およびデータの複製)の再ロードを防ぎます。Snowpipeは、後で変更された(つまり、異なるeTagを持つ)場合でも、同じ名前のファイルをロードしません。

ファイルロードメタデータは、テーブルではなく、 パイプオブジェクト に関連付けられています。その結果:

  • 既にロードされたファイルと同じ名前のステージングされたファイルは、新しい行の追加や、ファイルのエラーの修正などの変更があっても無視されます。

  • TRUNCATE TABLE コマンドを使用してテーブルを切り捨てても、メタデータをロードしているSnowpipeファイルは削除 されません

ただし、パイプは 14日間 のロード履歴メタデータのみを保持します。したがって、次のようになります。

14日以内に変更され、再度ステージングされたファイル

Snowpipeは、再度ステージングされた変更済みファイルを無視します。現在、同じデータファイルをリロードするには、 CREATE OR REPLACE PIPE 構文を使用してパイプオブジェクトを再作成する必要があります。

次の例では、 Snowpipe REST APIを使用したデータロードの準備 のステップ1の例に基づいて mypipe パイプを再作成します。

create or replace pipe mypipe as copy into mytable from @mystage;
14日後に変更され、再びステージングされたファイル

Snowpipeはデータを再度ロードするため、ターゲットテーブルにレコードが重複する可能性があります。

さらに、アクティブなSnowpipeロードと同じバケット/コンテナ、パス、およびターゲットテーブルを参照する COPY INTO <テーブル> ステートメントが実行された場合、ターゲットレコードに重複レコードをロードできます。COPY コマンドとSnowpipeのロード履歴は、Snowflakeに個別に保存されます。履歴ステージングデータを読み込んだ後、パイプ構成を使用して手動でデータを読み込む必要がある場合は、 ALTER PIPE ... REFRESH ステートメントを実行します。詳細については、このトピックの ファイルセットのアンロード をご参照ください。

COPY_HISTORYビューのLOAD_TIME値より前のCURRENT_TIMESTAMPを使用して挿入されたロード時間

テーブル設計者は、記録がテーブルにロードされるときに、現在のタイムスタンプをデフォルト値として挿入するタイムスタンプ列を追加できます。目的は、各記録がテーブルにロードされた時間をキャプチャすることです。ただし、タイムスタンプは、 COPY_HISTORY関数 (Information Schema)または COPY_HISTORYビュー (Account Usage)によって返されるLOAD_TIME列の値よりも前です。その理由は、 CURRENT_TIMESTAMP は、記録がテーブルに挿入されたとき(つまり、ロード操作のトランザクションがコミットされたとき)ではなく、クラウドサービスでロード操作がコンパイルされたときに評価されるためです。

Error: Integration {0} associated with the stage {1} cannot be found

外部ステージからデータをロードすると、次のようなエラーが発生する場合があります。

003139=SQL compilation error:\nIntegration ''{0}'' associated with the stage ''{1}'' cannot be found.

このエラーは、外部ステージと、ステージにリンクされているストレージ統合との関連付けが壊れている場合に発生する可能性があります。これは、ストレージ統合オブジェクトが再作成されたときに発生します( CREATE OR REPLACE STORAGE INTEGRATION を使用)。ステージは、ストレージ統合の名前ではなく、非表示の ID を使用してストレージ統合にリンクします。バックグラウンドでは、 CREATE OR REPLACE 構文はオブジェクトをドロップし、別の非表示の IDでオブジェクトを再作成します。

1つ以上のステージにリンクされた後にストレージ統合を再作成する必要がある場合は、 ALTER STAGE ステージ名 SET STORAGE_INTEGRATION = ストレージ統合名 を実行して、各ステージとストレージ統合間の関連付けを再確立する必要があります。

  • ステージ名 は、ステージの名前です。

  • ストレージ統合名 は、ストレージ統合の名前です。