Google Cloud Storage用Snowpipeの自動化

このトピックでは、Google Cloud Storage(GCS)イベントの Google Cloud Pub/Sub メッセージを使用して、Snowpipeデータのロードを自動的にトリガーする方法について説明します。

注釈

この機能は、Amazon Web Services(AWS)またはGoogle Cloud PlatformでホストされているSnowflakeアカウントに限定されています。GCS Pub/Subメッセージを使用してSnowpipeデータのロードを自動化する手順は、どちらのクラウドホスティングプラットフォームのアカウントでも同じです。

GCS から AWS でホストされているSnowflakeアカウントに対するSnowpipeデータロードの自動化のサポートは、 プレビュー機能 として提供されます。

このトピックの内容:

クラウドストレージへの安全なアクセスの構成

注釈

データファイルを保存する GCS バケットへの安全なアクセスをすでに構成している場合は、このセクションをスキップできます。

このセクションでは、Snowflakeストレージ統合オブジェクトを構成して、クラウドストレージの認証責任をSnowflake IDおよびアクセス管理(IAM)エンティティに委任する方法について説明します。

このセクションでは、ストレージ統合を使用して、Snowflakeが外部(つまり、Cloud Storage)ステージで参照されるGoogle Cloud Storageバケットとデータを読み書きできるようにする方法について説明します。統合は、名前付きのファーストクラスSnowflakeオブジェクトであり、秘密キーやアクセストークンなどの明示的なクラウドプロバイダー認証情報を渡す必要がありません。代わりに、統合オブジェクトはCloud Storageサービスアカウントを参照します。組織の管理者は、Cloud Storageアカウントのサービスアカウントのアクセス許可を付与します。

管理者は、統合を使用する外部ステージによってアクセスされるCloud Storageバケット(およびオプションのパス)の特定のセットにユーザーを制限することもできます。

注釈

このセクションの手順を完了するには、プロジェクトエディターとしてCloud Storageプロジェクトにアクセスする必要があります。プロジェクトエディターでない場合は、Cloud Storage管理者にこれらのタスクを実行するよう依頼してください。

次の図は、Cloud Storageステージの統合フローを示しています。

Google Cloud Storage Stage Integration Flow
  1. 外部(つまり、Cloud Storage)ステージは、その定義でストレージ統合オブジェクトを参照します。

  2. Snowflakeは、ストレージ統合をアカウント用に作成されたCloud Storageサービスアカウントに自動的に関連付けます。Snowflakeは、Snowflakeアカウントのすべての GCS ストレージ統合によって参照される単一のサービスアカウントを作成します。

  3. Cloud Storageプロジェクトのプロジェクトエディターは、ステージ定義で参照されているバケットにアクセスするためのアクセス許可をサービスアカウントに付与します。多数の外部ステージオブジェクトが、異なるバケットとパスを参照し、認証に同じ統合を使用できます。

ユーザーがステージに対してデータをロードまたはアンロードすると、Snowflakeは、アクセスを許可または拒否する前に、バケットのサービスアカウントに付与された権限を確認します。

このセクションの内容:

ステップ1:Snowflakeでクラウドストレージ統合を作成する

CREATE STORAGE INTEGRATION コマンドを使用して統合を作成します。統合は、外部クラウドストレージの認証責任をSnowflakeが生成したエンティティ(つまり、Cloud Storageサービスアカウント)に委任するSnowflakeオブジェクトです。Cloud Storageバケットにアクセスするために、Snowflakeは、データファイルを保存するバケットへのアクセス許可を付与できるサービスアカウントを作成します。

単一のストレージ統合により、複数の外部(つまり、 GCS)ステージをサポートできます。ステージ定義にある URL は、 STORAGE_ALLOWED_LOCATIONS パラメーターのために指定された GCS バケット(およびオプションのパス)に合わせる必要があります。

注釈

この SQL コマンドを実行できるのは、アカウント管理者( ACCOUNTADMIN ロールを持つユーザー)またはグローバル CREATE INTEGRATION 権限を持つロールのみです。

CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = GCS
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/') ]

条件:

  • 統合名 は、新しい統合の名前です。

  • バケット は、データファイルを保存するCloud Storageバケットの名前です(例: mybucket)。必須の STORAGE_ALLOWED_LOCATIONS パラメーターおよびオプションの STORAGE_BLOCKED_LOCATIONS パラメーターは、この統合を参照するステージが作成または変更されたときに、それぞれこれらのバケットへのアクセスを制限またはブロックします。

  • パス は、バケット内のオブジェクトを細かく制御するために使用できるオプションのパスです。

次の例では、2つのバケットとパスのいずれかを参照するために統合を使用する外部ステージを明示的に制限する統合を作成します。後のステップで、これらのバケットとパスのいずれかを参照する外部ステージを作成します。

この統合も使用する追加の外部ステージは、許可されたバケットとパスを参照できます。

CREATE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = GCS
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://mybucket1/path1/', 'gcs://mybucket2/path2/')
  STORAGE_BLOCKED_LOCATIONS = ('gcs://mybucket1/path1/sensitivedata/', 'gcs://mybucket2/path2/sensitivedata/');

ステップ2:Snowflakeアカウント用のCloud Storage Serviceアカウントを取得する

DESCRIBE INTEGRATION コマンドを実行して、Snowflakeアカウント用に自動的に作成されたCloud Storageサービスアカウントの ID を取得します。

DESC STORAGE INTEGRATION <integration_name>;

条件:

例:

DESC STORAGE INTEGRATION gcs_int;

+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+
| property                    | property_type | property_value                                                              | property_default |
+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------|
| ENABLED                     | Boolean       | true                                                                        | false            |
| STORAGE_ALLOWED_LOCATIONS   | List          | gcs://mybucket1/path1/,gcs://mybucket2/path2/                               | []               |
| STORAGE_BLOCKED_LOCATIONS   | List          | gcs://mybucket1/path1/sensitivedata/,gcs://mybucket2/path2/sensitivedata/   | []               |
| STORAGE_GCP_SERVICE_ACCOUNT | String        | service-account-id@project1-123456.iam.gserviceaccount.com                  |                  |
+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+

出力の STORAGE_GCP_SERVICE_ACCOUNT プロパティは、Snowflakeアカウント用に作成されたCloud Storageサービスアカウントを示しています(例: service-account-id@project1-123456.iam.gserviceaccount.com)。Snowflakeアカウント全体に、単一のCloud Storageサービスアカウントをプロビジョニングします。すべてのCloud Storage統合で、そのサービスアカウントを使用します。

ステップ3:バケットオブジェクトにアクセスするためのサービスアカウント権限を付与する

次の詳細な手順では、Cloud Storageバケットを使用してデータをロードおよびアンロードできるようにするために、Google Cloud Platform ConsoleでSnowflake用の IAM アクセス許可を構成する方法を説明します。

カスタム IAM ロールの作成

バケットにアクセスしてオブジェクトを取得するために必要な権限を持つカスタムロールを作成します。

  1. プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。

  2. ホームダッシュボードから、 IAM & admin » Roles を選択します。

  3. Create Role をクリックします。

  4. ロールの名前と説明を入力します。

  5. Add Permissions をクリックします。

  6. 権限のリストをフィルターし、リストから次を追加します。

    データのロードのみ
    • storage.buckets.get

    • storage.objects.get

    • storage.objects.list

    パージオプションを使用したデータのロード
    • storage.buckets.get

    • storage.objects.delete

    • storage.objects.get

    • storage.objects.list

    データのロードとアンロード
    • storage.buckets.get

    • storage.objects.create

    • storage.objects.delete

    • storage.objects.get

    • storage.objects.list

  7. Create をクリックします。

Cloud Storage Serviceアカウントへのカスタムロールの割り当て

  1. プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。

  2. ホームダッシュボードから、 Storage » Browser を選択します。

    Bucket List in Google Cloud Platform Console
  3. アクセス用に構成するバケットを選択します。

  4. 右上隅の SHOW INFO PANEL をクリックします。バケットの情報パネルがスライドアウトします。

  5. Add members フィールドで、 ステップ2:SnowflakeアカウントのCloud Storageサービスアカウントを取得する (このトピック内)の DESCRIBE INTEGRATION 出力からサービスアカウント名を検索します。

    Bucket Information Panel in Google Cloud Platform Console
  6. Select a role ドロップダウンから、 Storage » Custom » <ロール> を選択します。 <ロール> は、 カスタム IAM ロールの作成 (このトピック内)で作成したカスタムクラウドストレージロールです。

  7. Add ボタンをクリックします。サービスアカウント名は、情報パネルの Storage Object Viewer ロールドロップダウンに追加されます。

    Storage Object Viewer role list in Google Cloud Platform Console

Cloud Key Management Serviceの暗号化キーに対するCloud Storageサービスアカウント権限の付与

注釈

この手順は、Google Cloud Key Management Service(Cloud KMS)に保存されているキーを使用して GCS バケットが暗号化されている 場合にのみ 必要です。

  1. プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。

  2. ホームダッシュボードから、 Security » Cryptographic keys を選択します。

  3. GCS バケットに割り当てられているキーリングを選択します。

  4. 右上隅の SHOW INFO PANEL をクリックします。キーリングの情報パネルがスライドアウトします。

  5. Add members フィールドで、 ステップ2:SnowflakeアカウントのCloud Storageサービスアカウントを取得する (このトピック内)の DESCRIBE INTEGRATION 出力からサービスアカウント名を検索します。

  6. Select a role ドロップダウンから、 Cloud KMS CrytoKey Encryptor/Decryptor ロールを選択します。

  7. Add ボタンをクリックします。サービスアカウント名は、情報パネルの Cloud KMS CrytoKey Encryptor/Decryptor ロールドロップダウンに追加されます。

GCS Pub/Subを使用した自動Snowpipeの構成

前提条件

このトピックの説明では、次のアイテムが作成および構成されていることを前提としています。

GCP アカウント
  • GCS バケットからイベントメッセージを受信するPub/Subトピック。詳細については、 Pub/Subトピックの作成 (このトピック内)をご参照ください。

  • Pub/Subトピックからイベントメッセージを受信するサブスクリプション。詳細については、このトピックの Pub/Subサブスクリプションの作成 をご参照ください。

手順については、 Pub/Subのドキュメント をご参照ください。

Snowflake
  • データがロードされるSnowflakeデータベースのターゲットテーブル。

Pub/Subトピックの作成

Cloud Shell または Cloud SDK を使用して、Pub/Subトピックを作成します。

次のコマンドを実行してトピックを作成し、指定した GCS バケット内のアクティビティをリッスンできるようにします。

$ gsutil notification create -t <topic> -f json gs://<bucket-name>

条件:

  • <トピック> はトピックの名前です。

  • <バケット名> は、 GCS バケットの名前です。

トピックがすでに存在する場合、コマンドはそれを使用します。それ以外の場合は、新しいトピックが作成されます。

詳細については、Pub/Subドキュメントの Cloud StorageのPub/Sub通知の使用 をご参照ください。

Pub/Subサブスクリプションの作成

Cloud Console、 gcloud コマンドラインツール、またはCloud Pub/Sub API を使用して、Pub/Subトピックへのサブスクリプションを作成します。手順については、Pub/Subドキュメントの トピックとサブスクリプションの管理 をご参照ください。

Pub/Subサブスクリプション ID の取得

これらの手順では、Pub/Subトピックサブスクリプション ID を使用して、Snowflakeがイベントメッセージにアクセスできるようにします。

  1. プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。

  2. ホームダッシュボードから、 Big Data » Pub/Sub » Subscriptions を選択します。

  3. トピックサブスクリプションの Subscription ID 列の ID をコピーします

ステップ1: Snowflakeで通知統合を作成する

CREATE NOTIFICATION INTEGRATION コマンドを使用して通知統合を作成します。通知統合は、Pub/Subサブスクリプションを参照します。Snowflakeは、通知統合をアカウント用に作成された GCS サービスアカウントに関連付けます。Snowflakeは、Snowflakeアカウントのすべての GCS 通知統合によって参照される単一のサービスアカウントを作成します。

注釈

  • この SQL コマンドを実行できるのは、アカウント管理者( ACCOUNTADMIN ロールを持つユーザー)またはグローバル CREATE INTEGRATION 権限を持つロールのみです。

  • 通知統合用の GCS サービスアカウントは、ストレージ統合用に作成されたサービスアカウントとは異なります。

CREATE NOTIFICATION INTEGRATION <integration_name>
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = '<subscription_id>';

条件:

例:

CREATE NOTIFICATION INTEGRATION my_notification_int
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = 'projects/project-1234/subscriptions/sub2';

注釈

現在、 ALTER NOTIFICATION INTEGRATION コマンドは、 GCP_PUBSUB_SUBSCRIPTION_NAME パラメーター値の変更をサポートしていません。誤ったパラメーター値が指定されている場合は、通知統合を再作成する必要があります( CREATE OR REPLACE NOTIFICATION INTEGRATION を使用)。

ステップ2:Pub/SubサブスクリプションにSnowflakeアクセスを許可する

  1. DESCRIBE INTEGRATION コマンドを実行して、Snowflakeサービスアカウント ID を取得します。

    DESC NOTIFICATION INTEGRATION <integration_name>;
    

    条件:

    例:

    DESC NOTIFICATION INTEGRATION my_notification_int;
    
  2. GCP_PUBSUB_SERVICE_ACCOUNT 列にサービスアカウント名を記録します。これは次の形式です。

    <service_account>@<project_id>.iam.gserviceaccount.com
    
  3. プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。

  4. ホームダッシュボードから、 Big Data » Pub/Sub » Subscriptions を選択します。

  5. アクセス用に構成するサブスクリプションを選択します。

  6. 右上隅の SHOW INFO PANEL をクリックします。サブスクリプションの情報パネルがスライドアウトします。

  7. Add members フィールドで、記録したサービスアカウント名を検索します。

  8. Select a role ドロップダウンから、 Pub/Sub Subscriber を選択します。

  9. Add ボタンをクリックします。サービスアカウント名は、情報パネルの Pub/Sub Subscriber ロールドロップダウンに追加されます。

ステップ3:自動インジェストを有効にしたパイプを作成する

CREATE PIPE コマンドを使用してパイプを作成します。パイプは、Snowpipeがインジェスションキューからターゲットテーブルにデータをロードするために使用する COPY INTO <テーブル> ステートメントを定義します。

CREATE PIPE <pipe_name>
  AUTO_INGEST = true
  INTEGRATION = '<integration_name>'
  AS
<copy_statement>;

条件:

  • <パイプ名> は、パイプの識別子です。パイプが作成されるスキーマに対して一意である必要があります。

    識別子はアルファベットで始まる必要があり、識別子文字列全体が二重引用符で囲まれていない限り、スペースや特殊文字を含めることはできません(例: "My object")。二重引用符で囲まれた識別子も大文字と小文字が区別されます。

  • <統合名> は、 ステップ1:Snowflakeでクラウドストレージ統合を作成する で作成した通知統合の名前です。

  • コピーステートメント は、キューファイルからSnowflakeテーブルにデータをロードするために使用される COPY INTO <テーブル> ステートメントです。このステートメントは、パイプのテキスト/定義として機能し、 SHOW PIPES 出力に表示されます。

たとえば、 snowpipe_db.public スキーマにパイプを作成し、 mystage という外部(GCS)ステージにステージングされたファイルから、 mytable という宛先テーブルにデータをロードします。

CREATE PIPE snowpipe_db.public.mypipe
  AUTO_INGEST = true
  INTEGRATION = 'MYINT'
  AS
COPY INTO snowpipe_db.public.mytable
  FROM @snowpipe_db.public.mystage/path2;

重要

パイプ定義のステージ参照を既存のパイプと比較します。同じ GCS バケットのディレクトリパスが重複していないことを確認します。そうしないと、複数のパイプが同じデータファイルのセットを複数回、1つ以上のターゲットテーブルにロードする可能性があります。これは、たとえば、複数のステージが gcs://mybucket1/path1gcs://mybucket1/path1/path2 などの、異なるレベルの細分性で同じ GCS バケットを参照する場合に発生する可能性があります。このユースケースでは、ファイルが gcs://mybucket1/path1/path2 にステージングされている場合、両方のステージのパイプがファイルのコピーをロードします。

これは、Snowpipeの手動設定(自動インジェストは 無効)とは異なり、ユーザーがファイルの名前付きセットを REST API に送信して、ロードするファイルをキューに入れる必要があります。自動インジェストを有効にすると、各パイプはPub/Subメッセージから生成されたファイルリストを受け取ります。データの重複を避けるため、特に注意が必要です。

自動インジェストを使用したSnowpipeが構成されました。

新しいデータファイルが GCS バケットに追加されると、イベントメッセージはSnowpipeに通知して、パイプで定義されたターゲットテーブルにそれらをロードします。

ステップ4:履歴ファイルをロードする

Pub/Subメッセージが構成される に外部ステージに存在したデータファイルのバックログをロードするには、 ALTER PIPE ... REFRESH ステートメントを実行します。

SYSTEM$PIPE_STATUS 出力

SYSTEM$PIPE_STATUS 関数は、パイプの現在のステータスの JSON 表現を取得します。

TRUE に設定された AUTO_INGEST のパイプの場合、関数は次の名前/値のペアを含む JSON オブジェクトを返します(現在のパイプステータスに該当する場合)。

{"executionState":"<value>","oldestFileTimestamp":<value>,"pendingFileCount":<value>,"notificationChannelName":"<value>","numOutstandingMessagesOnChannel":<value>,"lastReceivedMessageTimestamp":"<value>","lastForwardedMessageTimestamp":"<value>","error":<value>,"fault":<value>}

条件:

executionState

パイプの現在の実行状態は、次のいずれかです。

  • RUNNING (つまり、すべてが正常です。Snowflakeはこのパイプのファイルをアクティブに処理する場合としない場合があります)

  • STOPPED_FEATURE_DISABLED

  • STOPPED_STAGE_DROPPED

  • STOPPED_FILE_FORMAT_DROPPED

  • STOPPED_MISSING_PIPE

  • STOPPED_MISSING_TABLE

  • STALLED_COMPILATION_ERROR

  • STALLED_INITIALIZATION_ERROR

  • STALLED_EXECUTION_ERROR

  • STALLED_INTERNAL_ERROR

  • PAUSED

  • PAUSED_BY_SNOWFLAKE_ADMIN

  • PAUSED_BY_ACCOUNT_ADMIN

oldestFileTimestamp

現在キューに入れられているデータファイルの中で最も早いタイムスタンプ(該当する場合)。ファイルがキューに追加されるときにタイムスタンプが設定されます。

pendingFileCount

現在パイプによって処理されているファイルの数。パイプが一時停止されている場合、パイプが一時停止される 前に キューに入れられたファイルが処理されるため、この値は減少します。この値が 0 の場合、このパイプのキューに入れられているファイルがないか、パイプが事実上一時停止しています。

notificationChannelName

パイプに関連付けられた GCS ストレージキュー。

numOutstandingMessagesOnChannel

キューに入れられたがまだ受信されていないストレージキュー内のメッセージの数。

lastReceivedMessageTimestamp

ストレージキューから受信した最後のメッセージのタイムスタンプ。このメッセージが特定のパイプに適用されない場合があります(例:メッセージに関連付けられたパス/プレフィックスが、パイプ定義のパス/プレフィックスと一致しない場合)。また、作成されたデータオブジェクトによってトリガーされたメッセージのみが、自動インジェストパイプによって消費されます。

lastForwardedMessageTimestamp

パイプに転送された一致するパス/プレフィックスを持つ最後の「オブジェクトの作成」イベントメッセージのタイムスタンプ。

error

パイプが実行のために最後にコンパイルされたときに生成されたエラーメッセージ(該当する場合)。多くの場合、権限の問題またはオブジェクトの削除により、必要なオブジェクト(テーブル、ステージ、ファイル形式)へのアクセスに問題が生じたことが原因です。

fault

最新の内部Snowflakeプロセスエラー(該当する場合)。主にSnowflakeがデバッグ目的で使用します。