Snowpipeの管理

このトピックでは、Snowpipeの管理に関連する管理タスクについて説明します。

このトピックの内容:

Snowpipeがデータをロードした後のステージングされたファイルの削除

パイプオブジェクトは、 PURGE コピーオプションをサポートしていません。Snowpipeは、データがテーブルに正常にロードされたときに、ステージングされたファイルを自動的に削除できません。

不要になったステージングされたファイルを削除するには、 REMOVE コマンドを定期的に実行してファイルを削除することをお勧めします。

または、クラウドストレージサービスプロバイダーが提供するライフサイクル管理機能を構成します。

履歴データのロード

注釈

このセクションの情報は、イベント通知を使用した自動データロードに関連します。Snowpipe REST APIを呼び出すと、追加の手順を実行せずに履歴データをロードできます。

ALTER PIPE ... REFRESH ステートメントは、ターゲットテーブルにロードするために、過去7日間にステージングされたデータファイルのセットをSnowpipeの取り込みキューにコピーします。以前にステージングされたファイルからデータをロードする場合は、次の手順をお勧めします。

  1. COPY INTO <テーブル> ステートメントを実行して、履歴データをターゲットテーブルにロードします。

  2. Snowpipeとイベント通知を使用して、自動データロードを構成します。新しくステージングされたファイルは、ターゲットテーブルへの取り込みのイベント通知をトリガーします。履歴データファイルはイベント通知をトリガーしないため、2回ロードされません。

    手順については、次をご参照ください。

    Amazon S3

    Amazon S3用Snowpipeの自動化

    Google Cloud Storage

    Google Cloud Storage用Snowpipeの自動化

    Microsoft Azure

    Microsoft Azure BLOBストレージ用Snowpipeの自動化

  3. ALTER PIPE ... REFRESH ステートメントを実行し、ステップ1と2の間にステージングされたファイルをキューに入れます。このステートメントは、ターゲットテーブルとパイプの両方のロード履歴をチェックして、同じファイルが2回ロードされないようにします。

パイプの再作成

ほとんどのパイププロパティを変更(CREATE OR REPLACE PIPE ステートメントを使用)するには、パイプを再作成する必要があります。

このセクションでは、パイプを再作成する際に従うべき考慮事項とベストプラクティスについて説明します。

自動データロード用のパイプの再作成

イベント通知を使用してデータのロードを自動化するパイプを再作成するときは、次のステップを完了することをお勧めします。

  1. パイプを一時停止します(ALTER PIPE ... SET PIPE_EXECUTION_PAUSED = true を使用)。

  2. SYSTEM$PIPE_STATUS 関数をクエリし、パイプの実行状態が PAUSED であることを確認します。

  3. パイプを再作成します( CREATE OR REPLACE PIPE を使用)。

  4. パイプをもう一度一時停止します。

  5. クラウドメッセージングサービスの構成手順を確認して、設定が依然として正確であることを確認します。

  6. パイプを再開します( ALTER PIPE ... SET PIPE_EXECUTION_PAUSED = falseを使用)。

  7. SYSTEM$PIPE_STATUS 関数を再度クエリし、パイプの実行状態が RUNNING であることを確認します。

ロード履歴

Snowpipe操作のロード履歴は、パイプオブジェクトのメタデータに保存されます。パイプが再作成されると、ロード履歴はドロップされます。一般に、この条件は、ユーザーが後ほどパイプで ALTER PIPE ... REFRESH ステートメントを実行した場合にのみ影響します。こうすると、データがすでに正常にロードされ、ファイルがその後削除されなかった場合に、パイプの保存場所にあるステージングされたファイルから重複データをロードする可能性があります。

参照ステージのクラウドパラメーターの変更

外部ステージのクラウドパラメーターには以下が含まれます。

  • URL

  • STORAGE_INTEGRATION

  • ENCRYPTION

Snowpipeが正常に構成された後、参照ステージのクラウドパラメーターのいずれかを変更する必要がある場合は、次の手順を完了することをお勧めします。

  1. パイプを一時停止します( ALTER PIPE ... SET PIPE_EXECUTION_PAUSED = true を使用)。現在キューに入っているファイルがターゲットテーブルにロードされるのを待ちます。

  2. 必要に応じてステージパラメーターを変更します( ALTER STAGE を使用)。

  3. パイプを再開します( ALTER PIPE ... SET PIPE_EXECUTION_PAUSED = false を使用)。

パイプはトランザクションではないため、これらの手順により、Snowpipeは最新のステージパラメーター値を使用してファイルをキューに入れます。

警告

ステージの URL パラメーターを変更すると、クラウドメッセージングを利用する依存パイプがデータのロード(つまり、 AUTO_INGEST = TRUE)をトリガーして動作を停止する可能性があります。

パイプの所有権の譲渡

次の手順を実行して、パイプの所有権を譲渡します。

  1. PIPE_EXECUTION_PAUSED パラメーターを TRUEに設定します。

    このパラメーターにより、パイプの一時停止または再開が可能になります。このパラメーターは、次のレベルでサポートされています。

    • アカウント

    • スキーマ

    • パイプ

    パイプレベルでは、オブジェクト所有者(またはロール階層の親ロール)がパラメーターを設定して、個々のパイプを一時停止または再開できます。

    アカウント管理者(ACCOUNTADMIN ロールを持つユーザー)は、アカウントレベルでこのパラメーターを設定して、アカウント内のすべてのパイプを一時停止または再開できます。同様に、スキーマに対する MODIFY 権限を持つユーザーは、スキーマレベルでパイプを一時停止または再開できます。この大きなドメインコントロールは、パラメーターが低いレベル(例: オブジェクトレベルの所有者)でまだ設定されていないパイプにのみ影響します。

  2. GRANT OWNERSHIP を使用してパイプの所有権を譲渡します。

  3. パイプを強制的に再開します( SYSTEM$PIPE_FORCE_RESUME を使用)。

    この手順により、新しい所有者はパイプステータスを評価し、 SYSTEM$PIPE_STATUS を使用してロードを待機しているデータファイルの数を判断できます。ターゲットテーブルへのロードが認証されたファイルのみがキューに入れられていることを確認することをお勧めします。

パイプ定義の COPY ステートメントの変更

たとえば、ターゲットテーブルに列が追加された場合に、パイプ定義の COPY ステートメントを変更するには、次の手順を実行します。

このセクションのコマンドを実行するには、ユーザーの現在のロールで、パイプに対する OWNERSHIP 権限が必要です。

  1. パイプを一時停止します( ALTER PIPE ... SET PIPE_EXECUTION_PAUSED=true を使用)。

  2. SYSTEM$PIPE_STATUS 関数をクエリし、パイプの実行状態が PAUSED であり、保留中のファイル数が0であることを確認します。

  3. パイプを再作成して、定義内の COPY ステートメントを変更します。次のオプションの どちらか を選択します。

    • パイプをドロップし( DROP PIPE を使用)、パイプを作成します(CREATE PIPE を使用)。

    • パイプを再作成します(CREATE OR REPLACE PIPE 構文を使用)。内部的に、パイプはドロップされて作成されます。

  4. パイプをもう一度一時停止します。

  5. クラウドメッセージングサービスの構成手順を確認して、設定が依然として正確であることを確認します。

  6. パイプを再開します( ALTER PIPE ... SET PIPE_EXECUTION_PAUSED = falseを使用)。

  7. SYSTEM$PIPE_STATUS 関数を再度クエリし、パイプの実行状態が RUNNING であることを確認します。

注釈

ファイルロードメタデータは、テーブルではなく、 パイプオブジェクト に関連付けられています。パイプを再作成すると、ロードされたファイルの履歴が削除されます。Snowpipeによって既にロードされているファイルが誤ってパイプに再送信され、ターゲットテーブルに再度ロードされないようにします。テーブルのクエリ履歴を表示するには、 COPY_HISTORY 関数をクエリします。

古いパイプを再開する

注釈

このセクションは、データロードをトリガーするためにクラウドメッセージングを利用するパイプオブジェクト(つまり、パイプ定義が AUTO_INGEST = TRUE のもの)のみに関連しています。

パイプが一時停止されると、パイプに対して受信されたイベントメッセージは制限された保持期間に入ります。期間はデフォルトで14日です。パイプが14日より長く一時停止されている場合、パイプは古くなっていると見なされます。

古いパイプを再開するには、権限を持つロールが SYSTEM$PIPE_FORCE_RESUME 関数を呼び出し、STALENESS_CHECK_OVERRIDE引数を入力する必要があります。この引数は、ロールが古いパイプを再開していることを理解していることを示しています。

たとえば、 mydb.myschema データベースとスキーマの古い stalepipe1 パイプを再開します。

SELECT SYSTEM$PIPE_FORCE_RESUME('mydb.myschema.stalepipe1','staleness_check_override');
Copy

古いパイプが一時停止されているときに、パイプの所有権が別のロールに譲渡された場合、パイプを再開するには、追加のOWNERSHIP_TRANSFER_CHECK_OVERRIDE引数が必要です。たとえば、新しいロールに譲渡された mydb.myschema データベースとスキーマの古い stalepipe2 パイプを再開します。

SELECT SYSTEM$PIPE_FORCE_RESUME('mydb.myschema.stalepipe1','staleness_check_override, ownership_transfer_check_override');
Copy

パイプが一時停止している間に受信したイベント通知が制限された保持期間の終わりに達すると、Snowflakeはそれを内部メタデータから削除するようにスケジュールします。パイプが後で再開された場合、Snowpipeはこれらの古い通知をベストエフォートベーシスで処理します。Snowflakeはそれらが処理されることを保証できません。

たとえば、パイプが一時停止されてから15日後に再開された場合、Snowpipeは通常、パイプが一時停止された最初の日に受信されたイベント通知をスキップします(つまり、現在は14日以上経過しています)。パイプが一時停止されてから16日後に再開された場合、Snowpipeは通常、パイプが一時停止されてから1日目と2日目に受信したイベント通知をスキップする、などです。