Google Cloud Storage用Snowpipeの自動化

このトピックでは、Google Cloud Storage(GCS)イベントの Google Cloud Pub/Sub メッセージを使用して、Snowpipeデータのロードを自動的にトリガーする手順について説明します。

OBJECT_FINALIZE イベントのみが、Snowpipeをトリガーしてファイルをロードします。Snowflakeは、コスト、イベントノイズ、遅延を低減するために、Snowpipeでサポートされているイベントのみを送信することをお勧めします。

このトピックの内容:

クラウドプラットフォームのサポート

GCS Pub / Subイベントメッセージを使用した自動Snowpipeデータロードのトリガーは、 サポートされているすべてのクラウドプラットフォーム でホストされているSnowflakeアカウントでサポートされています。

クラウドストレージへの安全なアクセスの構成

注釈

データファイルを保存する GCS バケットへの安全なアクセスをすでに構成している場合は、このセクションをスキップできます。

このセクションでは、Snowflakeストレージ統合オブジェクトを構成して、クラウドストレージの認証責任をSnowflake IDおよびアクセス管理(IAM)エンティティに委任する方法について説明します。

このセクションでは、ストレージ統合を使用して、Snowflakeが外部(つまり、Cloud Storage)ステージで参照されるGoogle Cloud Storageバケットとデータを読み書きできるようにする方法について説明します。統合は、名前付きのファーストクラスSnowflakeオブジェクトであり、秘密キーやアクセストークンなどの明示的なクラウドプロバイダー認証情報を渡す必要がありません。代わりに、統合オブジェクトはCloud Storageサービスアカウントを参照します。組織の管理者は、Cloud Storageアカウントのサービスアカウントのアクセス許可を付与します。

管理者は、統合を使用する外部ステージによってアクセスされるCloud Storageバケット(およびオプションのパス)の特定のセットにユーザーを制限することもできます。

注釈

このセクションの手順を完了するには、プロジェクトエディターとしてCloud Storageプロジェクトにアクセスする必要があります。プロジェクトエディターでない場合は、Cloud Storage管理者にこれらのタスクを実行するよう依頼してください。

次の図は、Cloud Storageステージの統合フローを示しています。

Google Cloud Storage Stage Integration Flow
  1. 外部(つまり、Cloud Storage)ステージは、その定義でストレージ統合オブジェクトを参照します。

  2. Snowflakeは、ストレージ統合をアカウント用に作成されたCloud Storageサービスアカウントに自動的に関連付けます。Snowflakeは、Snowflakeアカウントのすべての GCS ストレージ統合によって参照される単一のサービスアカウントを作成します。

  3. Cloud Storageプロジェクトのプロジェクトエディターは、ステージ定義で参照されているバケットにアクセスするためのアクセス許可をサービスアカウントに付与します。多数の外部ステージオブジェクトが、異なるバケットとパスを参照し、認証に同じ統合を使用できます。

ユーザーがステージに対してデータをロードまたはアンロードすると、Snowflakeは、アクセスを許可または拒否する前に、バケットのサービスアカウントに付与された権限を確認します。

このセクションの内容:

ステップ1:Snowflakeでクラウドストレージ統合を作成する

CREATE STORAGE INTEGRATION コマンドを使用して統合を作成します。統合は、外部クラウドストレージの認証責任をSnowflakeが生成したエンティティ(つまり、Cloud Storageサービスアカウント)に委任するSnowflakeオブジェクトです。Cloud Storageバケットにアクセスするために、Snowflakeは、データファイルを保存するバケットへのアクセス許可を付与できるサービスアカウントを作成します。

単一のストレージ統合により、複数の外部(つまり、 GCS)ステージをサポートできます。ステージ定義にある URL は、 STORAGE_ALLOWED_LOCATIONS パラメーターのために指定された GCS バケット(およびオプションのパス)に合わせる必要があります。

注釈

この SQL コマンドを実行できるのは、アカウント管理者( ACCOUNTADMIN ロールを持つユーザー)またはグローバル CREATE INTEGRATION 権限を持つロールのみです。

CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/') ]
Copy

条件:

  • integration_name は、新しい統合の名前です。

  • bucket は、データファイルを保存するCloud Storageバケットの名前です(例: mybucket)。必須の STORAGE_ALLOWED_LOCATIONS パラメーターおよびオプションの STORAGE_BLOCKED_LOCATIONS パラメーターは、この統合を参照するステージが作成または変更されたときに、それぞれこれらのバケットへのアクセスを制限またはブロックします。

  • path は、バケット内のオブジェクトを細かく制御するために使用できるオプションのパスです。

次の例では、2つのバケットとパスのいずれかを参照するために統合を使用する外部ステージを明示的に制限する統合を作成します。後のステップで、これらのバケットとパスのいずれかを参照する外部ステージを作成します。

この統合も使用する追加の外部ステージは、許可されたバケットとパスを参照できます。

CREATE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://mybucket1/path1/', 'gcs://mybucket2/path2/')
  STORAGE_BLOCKED_LOCATIONS = ('gcs://mybucket1/path1/sensitivedata/', 'gcs://mybucket2/path2/sensitivedata/');
Copy

ステップ2:Snowflakeアカウント用のCloud Storage Serviceアカウントを取得する

DESCRIBE INTEGRATION コマンドを実行して、Snowflakeアカウント用に自動的に作成されたCloud Storageサービスアカウントの ID を取得します。

DESC STORAGE INTEGRATION <integration_name>;
Copy

条件:

例:

DESC STORAGE INTEGRATION gcs_int;

+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+
| property                    | property_type | property_value                                                              | property_default |
+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------|
| ENABLED                     | Boolean       | true                                                                        | false            |
| STORAGE_ALLOWED_LOCATIONS   | List          | gcs://mybucket1/path1/,gcs://mybucket2/path2/                               | []               |
| STORAGE_BLOCKED_LOCATIONS   | List          | gcs://mybucket1/path1/sensitivedata/,gcs://mybucket2/path2/sensitivedata/   | []               |
| STORAGE_GCP_SERVICE_ACCOUNT | String        | service-account-id@project1-123456.iam.gserviceaccount.com                  |                  |
+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+
Copy

出力の STORAGE_GCP_SERVICE_ACCOUNT プロパティは、Snowflakeアカウント用に作成されたCloud Storageサービスアカウントを示しています(例: service-account-id@project1-123456.iam.gserviceaccount.com)。Snowflakeアカウント全体に、単一のCloud Storageサービスアカウントをプロビジョニングします。すべてのCloud Storage統合で、そのサービスアカウントを使用します。

ステップ3:バケットオブジェクトにアクセスするためのサービスアカウント権限を付与する

次の詳細な手順では、Cloud Storageバケットを使用してデータをロードおよびアンロードできるようにするために、Google Cloud Platform ConsoleでSnowflake用の IAM アクセス許可を構成する方法を説明します。

カスタム IAM ロールの作成

バケットにアクセスしてオブジェクトを取得するために必要な権限を持つカスタムロールを作成します。

  1. プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。

  2. ホームダッシュボードから、 IAM & admin » Roles を選択します。

  3. Create Role をクリックします。

  4. ロールの名前と説明を入力します。

  5. Add Permissions をクリックします。

  6. 権限のリストをフィルターし、リストから次を追加します。

    アクション

    必要な権限

    データのロードのみ

    • storage.buckets.get

    • storage.objects.get

    • storage.objects.list

    パージオプションありのデータのロード、ステージ上で REMOVE コマンドを実行。

    • storage.buckets.get

    • storage.objects.delete

    • storage.objects.get

    • storage.objects.list

    データのロードとアンロード

    • storage.buckets.get (データ転送コスト計算用)

    • storage.objects.create

    • storage.objects.delete

    • storage.objects.get

    • storage.objects.list

    データのアンロードのみ

    • storage.buckets.get

    • storage.objects.create

    • storage.objects.delete

    • storage.objects.list

  7. Create をクリックします。

Cloud Storage Serviceアカウントへのカスタムロールの割り当て

  1. プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。

  2. ホームダッシュボードから、 Cloud Storage » Browser を選択します。

    Bucket List in Google Cloud Platform Console
  3. アクセス用に構成するバケットを選択します。

  4. 右上隅の SHOW INFO PANEL をクリックします。バケットの情報パネルがスライドアウトします。

  5. ADD PRINCIPAL ボタンをクリックします。

  6. New principals フィールドで、 ステップ2:SnowflakeアカウントのCloud Storageサービスアカウントを取得する (このトピック内)の DESCRIBE INTEGRATION 出力からサービスアカウント名を検索します。

    Bucket Information Panel in Google Cloud Platform Console
  7. Select a role ドロップダウンから、 Custom » <ロール> を選択します。 <ロール> は、 カスタム IAM ロールの作成 (このトピック内)で作成したカスタムクラウドストレージロールです。

  8. Save ボタンをクリックします。サービスアカウント名は、情報パネルの Storage Object Viewer ロールドロップダウンに追加されます。

    Storage Object Viewer role list in Google Cloud Platform Console

Cloud Key Management Serviceの暗号化キーに対するCloud Storageサービスアカウント権限の付与

注釈

この手順は、Google Cloud Key Management Service(Cloud KMS)に保存されているキーを使用して GCS バケットが暗号化されている 場合にのみ 必要です。

  1. プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。

  2. ホームダッシュボードから、 Security » Cryptographic keys を選択します。

  3. GCS バケットに割り当てられているキーリングを選択します。

  4. 右上隅の SHOW INFO PANEL をクリックします。キーリングの情報パネルがスライドアウトします。

  5. ADD PRINCIPAL ボタンをクリックします。

  6. New principals フィールドで、 ステップ2:SnowflakeアカウントのCloud Storageサービスアカウントを取得する (このトピック内)の DESCRIBE INTEGRATION 出力からサービスアカウント名を検索します。

  7. Select a role ドロップダウンから、 Cloud KMS CrytoKey Encryptor/Decryptor ロールを選択します。

  8. Save ボタンをクリックします。サービスアカウント名は、情報パネルの Cloud KMS CrytoKey Encryptor/Decryptor ロールドロップダウンに追加されます。

GCS Pub/Subを使用した自動化の構成

前提条件

このトピックの説明では、次のアイテムが作成および構成されていることを前提としています。

GCP アカウント
  • GCS バケットからイベントメッセージを受信するPub/Subトピック。詳細については、 Pub/Subトピックの作成 (このトピック内)をご参照ください。

  • Pub/Subトピックからイベントメッセージを受信するサブスクリプション。詳細については、このトピックの Pub/Subサブスクリプションの作成 をご参照ください。

手順については、 Pub/Subのドキュメント をご参照ください。

Snowflake
  • データをロードするSnowflakeデータベース内のターゲットテーブル。

Pub/Subトピックの作成

Cloud Shell または Cloud SDK を使用して、Pub/Subトピックを作成します。

次のコマンドを実行してトピックを作成し、指定した GCS バケット内のアクティビティをリッスンできるようにします。

$ gsutil notification create -t <topic> -f json gs://<bucket-name> -e OBJECT_FINALIZE
Copy

条件:

  • <トピック> はトピックの名前です。

  • <バケット名> は、 GCS バケットの名前です。

トピックがすでに存在する場合、コマンドはそれを使用します。それ以外の場合は、コマンドにより新しいトピックが作成されます。

詳細については、Pub/Subドキュメントの Cloud StorageのPub/Sub通知の使用 をご参照ください。

Pub/Subサブスクリプションの作成

Cloud Console、 gcloud コマンドラインツール、またはCloud Pub/Sub API を使用して、Pub/Subトピックへのプル配信があるサブスクリプションを作成します。手順については、Pub/Subドキュメントの トピックとサブスクリプションの管理 をご参照ください。

注釈

  • Snowflakeでは、デフォルトのプル配信を使用するPub / Subサブスクリプションのみがサポートされています。プッシュ配信はサポートされていません。

Pub/Subサブスクリプション ID の取得

これらの手順では、Pub/Subトピックサブスクリプション ID を使用して、Snowflakeがイベントメッセージにアクセスできるようにします。

  1. プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。

  2. ホームダッシュボードから、 Big Data » Pub/Sub » Subscriptions を選択します。

  3. トピックサブスクリプションの Subscription ID 列の ID をコピーします。

ステップ1: Snowflakeで通知統合を作成する

CREATE NOTIFICATION INTEGRATION コマンドを使用して通知統合を作成します。通知統合は、Pub/Subサブスクリプションを参照します。Snowflakeは、通知統合をアカウント用に作成された GCS サービスアカウントに関連付けます。Snowflakeは、Snowflakeアカウントのすべての GCS 通知統合によって参照される単一のサービスアカウントを作成します。

注釈

  • この SQL コマンドを実行できるのは、アカウント管理者( ACCOUNTADMIN ロールを持つユーザー)またはグローバル CREATE INTEGRATION 権限を持つロールのみです。

  • 通知統合用の GCS サービスアカウントは、ストレージ統合用に作成されたサービスアカウントとは異なります。

  • 単一の通知統合は、単一のGoogle Cloud Pub/Subサブスクリプションをサポートします。イベント通知は通知の統合間で分割されるため、複数の通知統合で同一のPub/Subサブスクリプションを参照すると、ターゲットテーブルのデータが失われる可能性があります。

CREATE NOTIFICATION INTEGRATION <integration_name>
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = '<subscription_id>';
Copy

条件:

例:

CREATE NOTIFICATION INTEGRATION my_notification_int
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = 'projects/project-1234/subscriptions/sub2';
Copy

ステップ2: Pub/SubサブスクリプションにSnowflakeアクセスを許可する

  1. DESCRIBE INTEGRATION コマンドを実行して、Snowflakeサービスアカウント ID を取得します。

    DESC NOTIFICATION INTEGRATION <integration_name>;
    
    Copy

    条件:

    例:

    DESC NOTIFICATION INTEGRATION my_notification_int;
    
    Copy
  2. GCP_PUBSUB_SERVICE_ACCOUNT 列にサービスアカウント名を記録します。これは次の形式です。

    <service_account>@<project_id>.iam.gserviceaccount.com
    
    Copy
  3. プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。

  4. ホームダッシュボードから、 Big Data » Pub/Sub » Subscriptions を選択します。

  5. アクセス用に構成するサブスクリプションを選択します。

  6. 右上隅の SHOW INFO PANEL をクリックします。サブスクリプションの情報パネルがスライドアウトします。

  7. ADD PRINCIPAL ボタンをクリックします。

  8. New principals フィールドで、記録したサービスアカウント名を検索します。

  9. Select a role ドロップダウンから、 Pub/Sub Subscriber を選択します。

  10. Save ボタンをクリックします。サービスアカウント名は、情報パネルの Pub/Sub Subscriber ロールドロップダウンに追加されます。

  11. Cloud Consoleの Dashboard ページに移動し、ドロップダウンリストからプロジェクトを選択します。

  12. ADD PEOPLE TO THIS PROJECT ボタンをクリックします。

  13. 記録したサービスアカウント名を追加します。

  14. Select a role ドロップダウンから、 Monitoring Viewer を選択します。

  15. Save ボタンをクリックします。サービスアカウント名は、 Monitoring Viewer ロールドロップダウンに追加されます。

ステップ3: ステージを作成する(必要な場合)

CREATE STAGE コマンドを使用して、 GCS バケットを参照する外部ステージを作成します。Snowflakeは、ステージングされたデータファイルを外部テーブルメタデータに読み取ります。または、既存の外部ステージを使用できます。

注釈

  • クラウドストレージの場所への安全なアクセスを構成するには、 クラウドストレージへの安全なアクセスの構成 (このトピック内)をご参照ください。

  • CREATE STAGE ステートメントでストレージ統合を参照するには、ロールにストレージ統合オブジェクトに対する USAGE 権限が必要です。

次の例では、ユーザーセッションのアクティブスキーマに mystage という名前のステージを作成します。クラウドストレージ URL にはパス files が含まれています。ステージは my_storage_int という名前のストレージ統合を参照します。

USE SCHEMA mydb.public;

CREATE STAGE mystage
  URL='gcs://load/files/'
  STORAGE_INTEGRATION = my_storage_int;
Copy

ステップ4: 自動インジェストを有効にしたパイプを作成する

CREATE PIPE コマンドを使用してパイプを作成します。パイプは、Snowpipeがインジェスションキューからターゲットテーブルにデータをロードするために使用する COPY INTO <テーブル> ステートメントを定義します。

CREATE PIPE <pipe_name>
  AUTO_INGEST = true
  INTEGRATION = '<notification_integration_name>'
  AS
<copy_statement>;
Copy

条件:

<パイプ名>

パイプの識別子。パイプが作成されるスキーマに対して一意である必要があります。

識別子はアルファベットで始まる必要があり、識別子文字列全体が二重引用符で囲まれていない限り、スペースや特殊文字を含めることはできません(例: "My object")。二重引用符で囲まれた識別子も大文字と小文字が区別されます。

INTEGRATION = '<通知統合名>'

Google Cloud Pub/Sub通知を使用して、ディレクトリテーブルのメタデータを自動的に更新するために使用される通知統合の名前。通知統合は、Snowflakeとサードパーティのクラウドメッセージキューサービス間のインターフェイスを提供するSnowflakeオブジェクトです。

copy_statement

キューファイルからSnowflakeテーブルにデータをロードするために使用される COPY INTO <テーブル> ステートメント。このステートメントは、パイプのテキスト/定義として機能し、 SHOW PIPES 出力に表示されます。

たとえば、 snowpipe_db.public スキーマにパイプを作成し、 mystage という外部(GCS)ステージにステージングされたファイルから、 mytable という宛先テーブルにデータをロードします。

CREATE PIPE snowpipe_db.public.mypipe
  AUTO_INGEST = true
  INTEGRATION = 'MY_NOTIFICATION_INT'
  AS
COPY INTO snowpipe_db.public.mytable
  FROM @snowpipe_db.public.mystage/path2;
Copy

INTEGRATION パラメーターは、 ステップ1: Snowflakeでクラウドストレージ統合を作成する で作成した my_notification_int 通知統合を参照します。統合名はすべて大文字で提供する必要があります。

重要

COPY INTO <テーブル> ステートメントの保存場所の参照が、アカウントにある既存のパイプの参照と重複していないことを確認します。そうしないと、複数のパイプが、同じデータファイルのセットをターゲットテーブルにロードする可能性があります。たとえば、この状況は、複数のパイプ定義が <ストレージの場所>/path1/<ストレージの場所>/path1/path2/ など、細分性のレベルが異なる同じストレージの場所を参照している場合に発生する可能性があります。この例では、ファイルが <ストレージの場所>/path1/path2/ でステージングされている場合は、両方のパイプがファイルのコピーをロードします。

SHOW PIPES を実行するか、Account Usageの PIPES ビューまたはInformation Schemaの PIPES ビューのいずれかをクエリして、アカウントにあるパイプの定義すべての COPY INTO <テーブル> ステートメントを表示します。

自動インジェストを使用したSnowpipeが構成されました。

新しいデータファイルが GCS バケットに追加されると、イベントメッセージはSnowpipeに通知して、パイプで定義されたターゲットテーブルにそれらをロードします。

ステップ5:履歴ファイルをロードする

Pub/Subメッセージが構成される に外部ステージに存在したデータファイルのバックログをロードするには、 ALTER PIPE ... REFRESH ステートメントを実行します。

ステップ6: ステージングされたファイルを削除する

データが正常にロードされ、ファイルが不要になったら、ステージングされたファイルを削除します。手順については、 Snowpipeがデータをロードした後のステージングされたファイルの削除 をご参照ください。

SYSTEM$PIPE_STATUS 出力

SYSTEM$PIPE_STATUS 関数は、パイプの現在のステータスの JSON 表現を取得します。

TRUEに設定された AUTO_INGEST のパイプの場合、関数は次の名前/値のペアを含む JSON オブジェクトを返します(現在のパイプステータスに該当する場合)。

{"executionState":"<value>","oldestFileTimestamp":<value>,"pendingFileCount":<value>,"notificationChannelName":"<value>","numOutstandingMessagesOnChannel":<value>,"lastReceivedMessageTimestamp":"<value>","lastForwardedMessageTimestamp":"<value>","error":<value>,"fault":<value>}
Copy

出力値の説明については、 SQL 関数の参照トピックをご参照ください。