Google Cloud Storage用Snowpipeの自動化¶
このトピックでは、Google Cloud Storage (GCS) イベント用の Google Cloud Pub/Sub メッセージを使用して、Google Cloud Storage 上の外部ステージから Snowpipe のデータ読み込みを自動的にトリガーする手順を説明します。
OBJECT_FINALIZE イベントのみが、Snowpipeをトリガーしてファイルをロードします。Snowflakeは、コスト、イベントノイズ、遅延を低減するために、Snowpipeでサポートされているイベントのみを送信することをお勧めします。
クラウドプラットフォームのサポート¶
GCS Pub / Subイベントメッセージを使用した自動Snowpipeデータロードのトリガーは、 サポートされているすべてのクラウドプラットフォーム でホストされているSnowflakeアカウントでサポートされています。
クラウドストレージへの安全なアクセスの構成¶
注釈
データファイルを保存する GCS バケットへの安全なアクセスをすでに構成している場合は、このセクションをスキップできます。
このセクションでは、Snowflakeストレージ統合オブジェクトを構成して、クラウドストレージの認証責任をSnowflake IDおよびアクセス管理(IAM)エンティティに委任する方法について説明します。
このセクションでは、ストレージ統合を使用して、Snowflakeが外部 (つまりクラウドストレージ) ステージで参照されるGoogle Cloud Storageバケットからデータを読み取り、バケットに書き込む方法について説明します。統合は、名前付きのファーストクラスSnowflakeオブジェクトであり、秘密キーやアクセストークンなどの明示的なクラウドプロバイダー認証情報を渡す必要がありません。代わりに、統合オブジェクトはCloud Storageサービスアカウントを参照します。組織の管理者は、Cloud Storageアカウントのサービスアカウントのアクセス許可を付与します。
管理者は、統合を使用する外部ステージによってアクセスされるCloud Storageバケット(およびオプションのパス)の特定のセットにユーザーを制限することもできます。
注釈
- このセクションの手順を完了するには、プロジェクトエディターとしてCloud Storageプロジェクトにアクセスする必要があります。プロジェクトエディターでない場合は、Cloud Storage管理者にこれらのタスクを実行するよう依頼してください。 
- Snowflakeが、ストレージがホストされているGoogle Cloud Storageリージョンをサポートしていることを確認します。詳細については、 サポートされているクラウドリージョン をご参照ください。 
次の図は、Cloud Storageステージの統合フローを示しています。
 
- 外部 (つまりクラウドストレージ) ステージは、その定義でストレージ統合オブジェクトを参照します。 
- Snowflakeは、ストレージ統合をアカウント用に作成されたCloud Storageサービスアカウントに自動的に関連付けます。Snowflakeは、Snowflakeアカウントのすべての GCS ストレージ統合によって参照される単一のサービスアカウントを作成します。 
- Cloud Storageプロジェクトのプロジェクトエディターは、ステージ定義で参照されているバケットにアクセスするためのアクセス許可をサービスアカウントに付与します。多数の外部ステージオブジェクトが、異なるバケットとパスを参照し、認証に同じ統合を使用できます。 
ユーザーがステージに対してデータをロードまたはアンロードすると、Snowflakeは、アクセスを許可または拒否する前に、バケットのサービスアカウントに付与された権限を確認します。
このセクションの内容:
ステップ1:Snowflakeでクラウドストレージ統合を作成する¶
CREATE STORAGE INTEGRATION コマンドを使用して統合を作成します。統合は、外部クラウドストレージの認証責任をSnowflakeが生成したエンティティ (つまり、クラウドストレージサービスアカウント) に委譲するSnowflakeオブジェクトです。Cloud Storageバケットにアクセスするために、Snowflakeは、データファイルを保存するバケットへのアクセス許可を付与できるサービスアカウントを作成します。
1つのストレージ統合で、複数の外部ステージ (つまり、 GCS) をサポートすることができます。ステージ定義にある URL は、 STORAGE_ALLOWED_LOCATIONS パラメーターのために指定された GCS バケット(およびオプションのパス)に合わせる必要があります。
注釈
この SQL コマンドを実行できるのは、アカウント管理者( ACCOUNTADMIN ロールを持つユーザー)またはグローバル CREATE INTEGRATION 権限を持つロールのみです。
CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/') ]
条件:
- integration_nameは、新しい統合の名前です。
- bucketは、データファイルをストレージするクラウドストレージのバケット名です (例えば、- mybucket)。必須の STORAGE_ALLOWED_LOCATIONS パラメーターおよびオプションの STORAGE_BLOCKED_LOCATIONS パラメーターは、この統合を参照するステージが作成または変更されたときに、それぞれこれらのバケットへのアクセスを制限またはブロックします。
- pathは、バケット内のオブジェクトを細かく制御するために使用できるオプションのパスです。
次の例では、2つのバケットとパスのいずれかを参照するために統合を使用する外部ステージを明示的に制限する統合を作成します。後のステップで、これらのバケットとパスのいずれかを参照する外部ステージを作成します。
この統合も使用する追加の外部ステージは、許可されたバケットとパスを参照できます。
CREATE STORAGE INTEGRATION gcs_int TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = 'GCS' ENABLED = TRUE STORAGE_ALLOWED_LOCATIONS = ('gcs://mybucket1/path1/', 'gcs://mybucket2/path2/') STORAGE_BLOCKED_LOCATIONS = ('gcs://mybucket1/path1/sensitivedata/', 'gcs://mybucket2/path2/sensitivedata/');
ステップ2:Snowflakeアカウント用のCloud Storageサービスアカウントを取得する¶
DESCRIBE INTEGRATION コマンドを実行して、Snowflakeアカウント用に自動的に作成されたCloud Storageサービスアカウントの ID を取得します。
DESC STORAGE INTEGRATION <integration_name>;
条件:
integration_nameは、 ステップ1:Snowflakeでクラウドストレージ統合を作成する (このトピック内) で作成した統合の名前です。
例:
DESC STORAGE INTEGRATION gcs_int; +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+ | property | property_type | property_value | property_default | +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------| | ENABLED | Boolean | true | false | | STORAGE_ALLOWED_LOCATIONS | List | gcs://mybucket1/path1/,gcs://mybucket2/path2/ | [] | | STORAGE_BLOCKED_LOCATIONS | List | gcs://mybucket1/path1/sensitivedata/,gcs://mybucket2/path2/sensitivedata/ | [] | | STORAGE_GCP_SERVICE_ACCOUNT | String | service-account-id@project1-123456.iam.gserviceaccount.com | | +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+
出力の STORAGE_GCP_SERVICE_ACCOUNT プロパティには、Snowflake アカウントに作成されたクラウドストレージサービスアカウントが表示されます (つまり、 service-account-id@project1-123456.iam.gserviceaccount.com)。Snowflakeアカウント全体に、単一のCloud Storageサービスアカウントをプロビジョニングします。すべてのCloud Storage統合で、そのサービスアカウントを使用します。
ステップ3: バケットオブジェクトにアクセスするためのサービスアカウント権限を付与する¶
次の詳細な手順では、Cloud Storageバケットを使用してデータをロードおよびアンロードできるようにするために、 Google Cloud console でSnowflake用の IAM アクセス許可を構成する方法を説明します。
カスタム IAM ロールの作成¶
バケットにアクセスしてオブジェクトを取得するために必要な権限を持つカスタムロールを作成します。
- プロジェクトエディターとして Google Cloud console にサインインします。 
- ホームダッシュボードから、 IAM & Admin » Roles を選択します。 
- Create Role を選択します。 
- カスタムロールの Title およびオプションの Description を入力します。 
- Add Permissions を選択します。 
- 権限のリストをフィルターし、リストから次を追加します。 - アクション - 必要な権限 - データのロードのみ - storage.buckets.get
- storage.objects.get
- storage.objects.list
 - パージオプションありのデータのロード、ステージ上で REMOVE コマンドを実行。 - storage.buckets.get
- storage.objects.delete
- storage.objects.get
- storage.objects.list
 - データのロードとアンロード - storage.buckets.get(データ転送コスト計算用)
- storage.objects.create
- storage.objects.delete
- storage.objects.get
- storage.objects.list
 - データのアンロードのみ - storage.buckets.get
- storage.objects.create
- storage.objects.delete
- storage.objects.list
 - COPY FILES を使用して外部ステージにファイルをコピーする - 以下の追加権限が必要です。 - storage.multipartUploads.abort
- storage.multipartUploads.create
- storage.multipartUploads.list
- storage.multipartUploads.listParts
 
- Add を選択します。 
- Create を選択します。 
Cloud Storage Serviceアカウントへのカスタムロールの割り当て¶
- プロジェクトエディターとして Google Cloud console にサインインします。 
- ホームダッシュボードから、 Cloud Storage » Buckets を選択します。 
- バケットのリストをフィルタリングし、ストレージ統合の作成時に指定したバケットを選択します。 
- Permissions » View by principals を選択してから、 Grant access を選択します。 
- Add principals の下に、 DESC STORAGE INTEGRATION コマンド出力から取得したサービスアカウント名を貼り付けます。 
- Assign roles で、以前に作成したカスタム IAM ロールを選択し、次に Save を選択します。 
Cloud StorageサービスアカウントにCloud Key Management Serviceの暗号化 キー の権限を付与すること¶
注釈
この手順は、Google Cloud Key Management Service(Cloud KMS)に保存されているキーを使用して GCS バケットが暗号化されている 場合にのみ 必要です。
- プロジェクトエディターとして Google Cloud console にサインインします。 
- ホームダッシュボードから、 Security » Key Management を検索して選択します。 
- GCS バケットに割り当てられているキーリングを選択します。 
- 右上隅の SHOW INFO PANEL をクリックします。キーリングの情報パネルがスライドアウトします。 
- ADD PRINCIPAL ボタンをクリックします。 
- New principals フィールドで、 DESCRIBE INTEGRATION 出力からサービスアカウント名を検索します ステップ2:SnowflakeアカウントのCloud Storageサービスアカウントを取得する (このトピック内)。 
- Select a role ドロップダウンから、 - Cloud KMS CrytoKey Encryptor/Decryptorロールを選択します。
- Save ボタンをクリックします。サービスアカウント名は、情報パネルの Cloud KMS CrytoKey Encryptor/Decryptor ロールドロップダウンに追加されます。 
注釈
SYSTEM$VALIDATE_STORAGE_INTEGRATION 関数を使用して、ストレージ統合の構成を検証することができます。
GCS Pub/Subを使用した自動化の構成¶
前提条件¶
このトピックの説明では、次のアイテムが作成および構成されていることを前提としています。
- GCP アカウント:
- GCS バケットからイベントメッセージを受信するPub/Subトピック。詳細については、 Pub/Subトピックの作成 (このトピック内)をご参照ください。 
- Pub/Subトピックからイベントメッセージを受信するサブスクリプション。詳細については、このトピックの Pub/Subサブスクリプションの作成 をご参照ください。 
 - 手順については、 Pub/Subのドキュメント をご参照ください。 
- Snowflake:
- データをロードするSnowflakeデータベース内のターゲットテーブル。 
 
Pub/Subトピックの作成¶
Cloud Shell または Cloud SDK を使用して、Pub/Subトピックを作成します。
次のコマンドを実行してトピックを作成し、指定した GCS バケット内のアクティビティをリッスンできるようにします。
$ gsutil notification create -t <topic> -f json -e OBJECT_FINALIZE gs://<bucket-name>
条件:
- <トピック>はトピックの名前です。
- <バケット名>は、 GCS バケットの名前です。
トピックがすでに存在する場合、コマンドはそれを使用します。それ以外の場合は、コマンドにより新しいトピックが作成されます。
詳細については、Pub/Subドキュメントの Cloud StorageのPub/Sub通知の使用 をご参照ください。
Pub/Subサブスクリプションの作成¶
Cloud Console、 gcloud コマンドラインツール、またはCloud Pub/Sub API を使用して、Pub/Subトピックへのプル配信があるサブスクリプションを作成します。手順については、Pub/Subドキュメントの トピックとサブスクリプションの管理 をご参照ください。
注釈
- Snowflakeでは、デフォルトのプル配信を使用するPub / Subサブスクリプションのみがサポートされています。プッシュ配信はサポートされていません。 
Pub/Subサブスクリプション ID の取得¶
これらの手順では、Pub/Subトピックサブスクリプション ID を使用して、Snowflakeがイベントメッセージにアクセスできるようにします。
- プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。 
- ホームダッシュボードから、 Big Data » Pub/Sub » Subscriptions を選択します。 
- トピックサブスクリプションの Subscription ID 列の ID をコピーします。 
ステップ1: Snowflakeで通知統合を作成する¶
CREATE NOTIFICATION INTEGRATION コマンドを使用して通知統合を作成します。
通知統合は、Pub/Subサブスクリプションを参照します。Snowflakeは、通知統合をアカウント用に作成された GCS サービスアカウントに関連付けます。Snowflakeは、Snowflakeアカウントのすべての GCS 通知統合によって参照される単一のサービスアカウントを作成します。
注釈
- この SQL コマンドを実行できるのは、アカウント管理者( ACCOUNTADMIN ロールを持つユーザー)またはグローバル CREATE INTEGRATION 権限を持つロールのみです。 
- 通知統合用の GCS サービスアカウントは、ストレージ統合用に作成されたサービスアカウントとは異なります。 
- 単一の通知統合は、単一のGoogle Cloud Pub/Subサブスクリプションをサポートします。イベント通知は通知の統合間で分割されるため、複数の通知統合で同一のPub/Subサブスクリプションを参照すると、ターゲットテーブルのデータが失われる可能性があります。したがって、パイプが既存のパイプと同じPub/Subサブスクリプションを参照する場合、パイプの作成はブロックされます。 
CREATE NOTIFICATION INTEGRATION <integration_name>
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = '<subscription_id>';
条件:
- integration_nameは、新しい統合の名前です。
- subscription_idは、 Pub/Subサブスクリプションの取得 ID で記録したサブスクリプション名です。
例:
CREATE NOTIFICATION INTEGRATION my_notification_int
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = 'projects/project-1234/subscriptions/sub2';
ステップ2:Pub/SubサブスクリプションにSnowflakeアクセスを許可する¶
- DESCRIBE INTEGRATION コマンドを実行して、Snowflakeサービスアカウント ID を取得します。 - DESC NOTIFICATION INTEGRATION <integration_name>; - 条件: - integration_nameは、 ステップ1:Snowflakeで通知統合を作成する で作成した統合の名前です。
 - 例: - DESC NOTIFICATION INTEGRATION my_notification_int; 
- GCP_PUBSUB_SERVICE_ACCOUNT 列にサービスアカウント名を記録します。これは次の形式です。 - <service_account>@<project_id>.iam.gserviceaccount.com 
- プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。 
- ホームダッシュボードから、 Big Data » Pub/Sub » Subscriptions を選択します。 
- アクセス用に構成するサブスクリプションを選択します。 
- 右上隅の SHOW INFO PANEL をクリックします。サブスクリプションの情報パネルがスライドアウトします。 
- ADD PRINCIPAL ボタンをクリックします。 
- New principals フィールドで、記録したサービスアカウント名を検索します。 
- Select a role ドロップダウンから、 Pub/Sub Subscriber を選択します。 
- Save ボタンをクリックします。サービスアカウント名は、情報パネルの Pub/Sub Subscriber ロールドロップダウンに追加されます。 
- Cloud Consoleの Dashboard ページに移動し、ドロップダウンリストからプロジェクトを選択します。 
- ADD PEOPLE TO THIS PROJECT ボタンをクリックします。 
- 記録したサービスアカウント名を追加します。 
- Select a role ドロップダウンから、 Monitoring Viewer を選択します。 
- Save ボタンをクリックします。サービスアカウント名は、 Monitoring Viewer ロールドロップダウンに追加されます。 
ステップ3: ステージを作成する(必要な場合)¶
CREATE STAGE コマンドを使用して、 GCS バケットを参照する外部ステージを作成します。Snowflakeは、ステージングされたデータファイルを外部テーブルメタデータに読み取ります。または、既存の外部ステージを使用できます。
注釈
- クラウドストレージの場所への安全なアクセスを構成するには、 クラウドストレージへの安全なアクセスの構成 (このトピック内)をご参照ください。 
- CREATE STAGE ステートメントでストレージ統合を参照するには、ロールにストレージ統合オブジェクトに対する USAGE 権限が必要です。 
次の例では、ユーザーセッションのアクティブスキーマに mystage という名前のステージを作成します。クラウドストレージ URL にはパス files が含まれています。ステージは my_storage_int という名前のストレージ統合を参照します。
USE SCHEMA mydb.public; CREATE STAGE mystage URL='gcs://load/files/' STORAGE_INTEGRATION = my_storage_int;
ステップ4: 自動インジェストを有効にしたパイプを作成する¶
CREATE PIPE コマンドを使用してパイプを作成します。パイプは、Snowpipeがインジェスションキューからターゲットテーブルにデータをロードするために使用する COPY INTO <テーブル> ステートメントを定義します。
たとえば、 snowpipe_db.public スキーマにパイプを作成し、 mystage という外部(GCS)ステージにステージングされたファイルから、 mytable という宛先テーブルにデータをロードします。
CREATE PIPE snowpipe_db.public.mypipe
  AUTO_INGEST = true
  INTEGRATION = 'MY_NOTIFICATION_INT'
  AS
    COPY INTO snowpipe_db.public.mytable
      FROM @snowpipe_db.public.mystage/path2;
INTEGRATION パラメーターは、 ステップ1:Snowflakeでクラウドストレージ統合を作成する で作成した my_notification_int 通知統合を参照します。統合名はすべて大文字で提供する必要があります。
重要
COPY INTO <テーブル> ステートメントの保存場所の参照が、アカウントにある既存のパイプの参照と重複していないことを確認します。そうしないと、複数のパイプが、同じデータファイルのセットをターゲットテーブルにロードする可能性があります。たとえば、この状況は、複数のパイプ定義が <ストレージの場所>/path1/ や <ストレージの場所>/path1/path2/ など、細分性のレベルが異なる同じストレージの場所を参照している場合に発生する可能性があります。この例では、ファイルが <ストレージの場所>/path1/path2/ でステージングされている場合は、両方のパイプがファイルのコピーをロードします。
SHOW PIPES を実行するか、Account Usageの PIPES ビューまたはInformation Schemaの PIPES ビューのいずれかをクエリして、アカウントにあるパイプの定義すべての COPY INTO <テーブル> ステートメントを表示します。
自動インジェストを使用したSnowpipeが構成されました。
新しいデータファイルが GCS バケットに追加されると、イベントメッセージはSnowpipeに通知して、パイプで定義されたターゲットテーブルにそれらをロードします。
ステップ5: 履歴ファイルをロードする¶
Pub/Subメッセージが構成される 前 に外部ステージに存在したデータファイルのバックログをロードするには、 ALTER PIPE ... REFRESH ステートメントを実行します。
ステップ6: ステージングされたファイルを削除する¶
データが正常にロードされ、ファイルが不要になったら、ステージングされたファイルを削除します。手順については、 Snowpipeがデータをロードした後のステージングされたファイルの削除 をご参照ください。
SYSTEM$PIPE_STATUS 出力¶
SYSTEM$PIPE_STATUS 関数は、パイプの現在のステータスの JSON 表現を取得します。
TRUEに設定された AUTO_INGEST のパイプの場合、関数は次の名前/値のペアを含む JSON オブジェクトを返します(現在のパイプステータスに該当する場合)。
{"executionState":"<value>","oldestFileTimestamp":<value>,"pendingFileCount":<value>,"notificationChannelName":"<value>","numOutstandingMessagesOnChannel":<value>,"lastReceivedMessageTimestamp":"<value>","lastForwardedMessageTimestamp":"<value>","error":<value>,"fault":<value>}
出力値の説明については、 SQL 関数の参照トピックをご参照ください。