Google Cloud Storage用Snowpipeの自動化¶
このトピックでは、Googleクラウドストレージ(GCS)イベントの Google Cloud Pub/Sub メッセージを使用して、Snowpipeデータのロードを自動的にトリガーする方法について説明します。
このトピックの内容:
クラウドプラットフォームのサポート¶
GCS Pub / Subイベントメッセージを使用した自動Snowpipeデータロードのトリガーは、次の クラウドプラットフォーム でホストされているSnowflakeアカウントでサポートされています。
Amazonウェブサービス(AWS)
Google Cloud Platform
このサポートを構成する手順は、どちらのクラウドホスティングプラットフォームのアカウントでも同じです。すべてのSnowflakeオブジェクト(統合、ステージ、ファイル形式など)は、ターゲットテーブルを格納するSnowflakeアカウントで作成されます。
GCS から AWS でホストされているSnowflakeアカウントへのSnowpipeデータロードの自動化のサポートは、 プレビュー機能 として提供されます。
クラウドストレージへの安全なアクセスの構成¶
注釈
データファイルを保存する GCS バケットへの安全なアクセスをすでに構成している場合は、このセクションをスキップできます。
このセクションでは、Snowflakeストレージ統合オブジェクトを構成して、クラウドストレージの認証責任をSnowflake IDおよびアクセス管理(IAM)エンティティに委任する方法について説明します。
このセクションでは、ストレージ統合を使用して、Snowflakeが外部(つまり、Cloud Storage)ステージで参照されるGoogle Cloud Storageバケットとデータを読み書きできるようにする方法について説明します。統合は、名前付きのファーストクラスSnowflakeオブジェクトであり、秘密キーやアクセストークンなどの明示的なクラウドプロバイダー認証情報を渡す必要がありません。代わりに、統合オブジェクトはCloud Storageサービスアカウントを参照します。組織の管理者は、Cloud Storageアカウントのサービスアカウントのアクセス許可を付与します。
管理者は、統合を使用する外部ステージによってアクセスされるCloud Storageバケット(およびオプションのパス)の特定のセットにユーザーを制限することもできます。
注釈
このセクションの手順を完了するには、プロジェクトエディターとしてCloud Storageプロジェクトにアクセスする必要があります。プロジェクトエディターでない場合は、Cloud Storage管理者にこれらのタスクを実行するよう依頼してください。
次の図は、Cloud Storageステージの統合フローを示しています。
外部(つまり、Cloud Storage)ステージは、その定義でストレージ統合オブジェクトを参照します。
Snowflakeは、ストレージ統合をアカウント用に作成されたCloud Storageサービスアカウントに自動的に関連付けます。Snowflakeは、Snowflakeアカウントのすべての GCS ストレージ統合によって参照される単一のサービスアカウントを作成します。
Cloud Storageプロジェクトのプロジェクトエディターは、ステージ定義で参照されているバケットにアクセスするためのアクセス許可をサービスアカウントに付与します。多数の外部ステージオブジェクトが、異なるバケットとパスを参照し、認証に同じ統合を使用できます。
ユーザーがステージに対してデータをロードまたはアンロードすると、Snowflakeは、アクセスを許可または拒否する前に、バケットのサービスアカウントに付与された権限を確認します。
このセクションの内容:
ステップ1:Snowflakeでクラウドストレージ統合を作成する¶
CREATE STORAGE INTEGRATION コマンドを使用して統合を作成します。統合は、外部クラウドストレージの認証責任をSnowflakeが生成したエンティティ(つまり、Cloud Storageサービスアカウント)に委任するSnowflakeオブジェクトです。Cloud Storageバケットにアクセスするために、Snowflakeは、データファイルを保存するバケットへのアクセス許可を付与できるサービスアカウントを作成します。
単一のストレージ統合により、複数の外部(つまり、 GCS)ステージをサポートできます。ステージ定義にある URL は、 STORAGE_ALLOWED_LOCATIONS パラメーターのために指定された GCS バケット(およびオプションのパス)に合わせる必要があります。
注釈
この SQL コマンドを実行できるのは、アカウント管理者( ACCOUNTADMIN ロールを持つユーザー)またはグローバル CREATE INTEGRATION 権限を持つロールのみです。
CREATE STORAGE INTEGRATION <integration_name>
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = GCS
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/')
[ STORAGE_BLOCKED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/') ]
条件:
統合名
は、新しい統合の名前です。バケット
は、データファイルを保存するCloud Storageバケットの名前です(例:mybucket
)。必須の STORAGE_ALLOWED_LOCATIONS パラメーターおよびオプションの STORAGE_BLOCKED_LOCATIONS パラメーターは、この統合を参照するステージが作成または変更されたときに、それぞれこれらのバケットへのアクセスを制限またはブロックします。パス
は、バケット内のオブジェクトを細かく制御するために使用できるオプションのパスです。
次の例では、2つのバケットとパスのいずれかを参照するために統合を使用する外部ステージを明示的に制限する統合を作成します。後のステップで、これらのバケットとパスのいずれかを参照する外部ステージを作成します。
この統合も使用する追加の外部ステージは、許可されたバケットとパスを参照できます。
CREATE STORAGE INTEGRATION gcs_int TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = GCS ENABLED = TRUE STORAGE_ALLOWED_LOCATIONS = ('gcs://mybucket1/path1/', 'gcs://mybucket2/path2/') STORAGE_BLOCKED_LOCATIONS = ('gcs://mybucket1/path1/sensitivedata/', 'gcs://mybucket2/path2/sensitivedata/');
ステップ2:Snowflakeアカウント用のCloud Storage Serviceアカウントを取得する¶
DESCRIBE INTEGRATION コマンドを実行して、Snowflakeアカウント用に自動的に作成されたCloud Storageサービスアカウントの ID を取得します。
DESC STORAGE INTEGRATION <integration_name>;
条件:
統合名
は、 ステップ1:Snowflakeでクラウドストレージ統合を作成する (このトピック内)で作成した統合の名前です。
例:
DESC STORAGE INTEGRATION gcs_int; +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+ | property | property_type | property_value | property_default | +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------| | ENABLED | Boolean | true | false | | STORAGE_ALLOWED_LOCATIONS | List | gcs://mybucket1/path1/,gcs://mybucket2/path2/ | [] | | STORAGE_BLOCKED_LOCATIONS | List | gcs://mybucket1/path1/sensitivedata/,gcs://mybucket2/path2/sensitivedata/ | [] | | STORAGE_GCP_SERVICE_ACCOUNT | String | service-account-id@project1-123456.iam.gserviceaccount.com | | +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+
出力の STORAGE_GCP_SERVICE_ACCOUNT プロパティは、Snowflakeアカウント用に作成されたCloud Storageサービスアカウントを示しています(例: service-account-id@project1-123456.iam.gserviceaccount.com
)。Snowflakeアカウント全体に、単一のCloud Storageサービスアカウントをプロビジョニングします。すべてのCloud Storage統合で、そのサービスアカウントを使用します。
ステップ3:バケットオブジェクトにアクセスするためのサービスアカウント権限を付与する¶
次の詳細な手順では、Cloud Storageバケットを使用してデータをロードおよびアンロードできるようにするために、Google Cloud Platform ConsoleでSnowflake用の IAM アクセス許可を構成する方法を説明します。
カスタム IAM ロールの作成¶
バケットにアクセスしてオブジェクトを取得するために必要な権限を持つカスタムロールを作成します。
プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。
ホームダッシュボードから、 IAM & admin » Roles を選択します。
Create Role をクリックします。
ロールの名前と説明を入力します。
Add Permissions をクリックします。
権限のリストをフィルターし、リストから次を追加します。
- データのロードのみ
storage.buckets.get
storage.objects.get
storage.objects.list
- パージオプションを使用したデータのロード
storage.buckets.get
storage.objects.delete
storage.objects.get
storage.objects.list
- データのロードとアンロード
storage.buckets.get
storage.objects.create
storage.objects.delete
storage.objects.get
storage.objects.list
Create をクリックします。
Cloud Storage Serviceアカウントへのカスタムロールの割り当て¶
プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。
ホームダッシュボードから、 Storage » Browser を選択します。
アクセス用に構成するバケットを選択します。
右上隅の SHOW INFO PANEL をクリックします。バケットの情報パネルがスライドアウトします。
Add members フィールドで、 ステップ2:SnowflakeアカウントのCloud Storageサービスアカウントを取得する (このトピック内)の DESCRIBE INTEGRATION 出力からサービスアカウント名を検索します。
Select a role ドロップダウンから、 Storage » Custom »
<ロール>
を選択します。<ロール>
は、 カスタム IAM ロールの作成 (このトピック内)で作成したカスタムクラウドストレージロールです。Add ボタンをクリックします。サービスアカウント名は、情報パネルの Storage Object Viewer ロールドロップダウンに追加されます。
Cloud Key Management Serviceの暗号化キーに対するCloud Storageサービスアカウント権限の付与¶
注釈
この手順は、Google Cloud Key Management Service(Cloud KMS)に保存されているキーを使用して GCS バケットが暗号化されている 場合にのみ 必要です。
プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。
ホームダッシュボードから、 Security » Cryptographic keys を選択します。
GCS バケットに割り当てられているキーリングを選択します。
右上隅の SHOW INFO PANEL をクリックします。キーリングの情報パネルがスライドアウトします。
Add members フィールドで、 ステップ2:SnowflakeアカウントのCloud Storageサービスアカウントを取得する (このトピック内)の DESCRIBE INTEGRATION 出力からサービスアカウント名を検索します。
Select a role ドロップダウンから、
Cloud KMS CrytoKey Encryptor/Decryptor
ロールを選択します。Add ボタンをクリックします。サービスアカウント名は、情報パネルの Cloud KMS CrytoKey Encryptor/Decryptor ロールドロップダウンに追加されます。
GCS Pub/Subを使用した自動Snowpipeの構成¶
前提条件¶
このトピックの説明では、次のアイテムが作成および構成されていることを前提としています。
- GCP アカウント
GCS バケットからイベントメッセージを受信するPub/Subトピック。詳細については、 Pub/Subトピックの作成 (このトピック内)をご参照ください。
Pub/Subトピックからイベントメッセージを受信するサブスクリプション。詳細については、このトピックの Pub/Subサブスクリプションの作成 をご参照ください。
手順については、 Pub/Subのドキュメント をご参照ください。
- Snowflake
データがロードされるSnowflakeデータベースのターゲットテーブル。
Pub/Subトピックの作成¶
Cloud Shell または Cloud SDK を使用して、Pub/Subトピックを作成します。
次のコマンドを実行してトピックを作成し、指定した GCS バケット内のアクティビティをリッスンできるようにします。
$ gsutil notification create -t <topic> -f json gs://<bucket-name>
条件:
<トピック>
はトピックの名前です。<バケット名>
は、 GCS バケットの名前です。
トピックがすでに存在する場合、コマンドはそれを使用します。それ以外の場合は、新しいトピックが作成されます。
詳細については、Pub/Subドキュメントの Cloud StorageのPub/Sub通知の使用 をご参照ください。
Pub/Subサブスクリプションの作成¶
Cloud Console、 gcloud
コマンドラインツール、またはCloud Pub/Sub API を使用して、Pub/Subトピックへのサブスクリプションを作成します。手順については、Pub/Subドキュメントの トピックとサブスクリプションの管理 をご参照ください。
Pub/Subサブスクリプション ID の取得¶
これらの手順では、Pub/Subトピックサブスクリプション ID を使用して、Snowflakeがイベントメッセージにアクセスできるようにします。
プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。
ホームダッシュボードから、 Big Data » Pub/Sub » Subscriptions を選択します。
トピックサブスクリプションの Subscription ID 列の ID をコピーします
ステップ1: Snowflakeで通知統合を作成する¶
CREATE NOTIFICATION INTEGRATION コマンドを使用して通知統合を作成します。通知統合は、Pub/Subサブスクリプションを参照します。Snowflakeは、通知統合をアカウント用に作成された GCS サービスアカウントに関連付けます。Snowflakeは、Snowflakeアカウントのすべての GCS 通知統合によって参照される単一のサービスアカウントを作成します。
注釈
この SQL コマンドを実行できるのは、アカウント管理者( ACCOUNTADMIN ロールを持つユーザー)またはグローバル CREATE INTEGRATION 権限を持つロールのみです。
通知統合用の GCS サービスアカウントは、ストレージ統合用に作成されたサービスアカウントとは異なります。
CREATE NOTIFICATION INTEGRATION <integration_name>
TYPE = QUEUE
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = true
GCP_PUBSUB_SUBSCRIPTION_NAME = '<subscription_id>';
条件:
統合名
は、新しい統合の名前です。サブスクリプションID
は、 Pub/Subサブスクリプションの取得 ID で記録したサブスクリプション名です。
例:
CREATE NOTIFICATION INTEGRATION my_notification_int
TYPE = QUEUE
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = true
GCP_PUBSUB_SUBSCRIPTION_NAME = 'projects/project-1234/subscriptions/sub2';
注釈
現在、 ALTER NOTIFICATION INTEGRATION コマンドは、 GCP_PUBSUB_SUBSCRIPTION_NAME パラメーター値の変更をサポートしていません。誤ったパラメーター値が指定されている場合は、通知統合を再作成する必要があります( CREATE OR REPLACE NOTIFICATION INTEGRATION を使用)。
ステップ2:Pub/SubサブスクリプションにSnowflakeアクセスを許可する¶
DESCRIBE INTEGRATION コマンドを実行して、Snowflakeサービスアカウント ID を取得します。
DESC NOTIFICATION INTEGRATION <integration_name>;
条件:
統合名
は、 ステップ1:Snowflakeで通知統合を作成する で作成した統合の名前です。
例:
DESC NOTIFICATION INTEGRATION my_notification_int;
GCP_PUBSUB_SERVICE_ACCOUNT 列にサービスアカウント名を記録します。これは次の形式です。
<service_account>@<project_id>.iam.gserviceaccount.com
プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。
ホームダッシュボードから、 Big Data » Pub/Sub » Subscriptions を選択します。
アクセス用に構成するサブスクリプションを選択します。
右上隅の SHOW INFO PANEL をクリックします。サブスクリプションの情報パネルがスライドアウトします。
Add members フィールドで、記録したサービスアカウント名を検索します。
Select a role ドロップダウンから、 Pub/Sub Subscriber を選択します。
Add ボタンをクリックします。サービスアカウント名は、情報パネルの Pub/Sub Subscriber ロールドロップダウンに追加されます。
Cloud Consoleの Dashboard ページに移動し、ドロップダウンリストからプロジェクトを選択します。
ADD PEOPLE TO THIS PROJECT ボタンをクリックします。
記録したサービスアカウント名を追加します。
Select a role ドロップダウンから、 Monitoring Viewer を選択します。
Add ボタンをクリックします。サービスアカウント名は、 Monitoring Viewer ロールドロップダウンに追加されます。
ステップ3:自動インジェストを有効にしたパイプを作成する¶
CREATE PIPE コマンドを使用してパイプを作成します。パイプは、Snowpipeがインジェスションキューからターゲットテーブルにデータをロードするために使用する COPY INTO <テーブル> ステートメントを定義します。
CREATE PIPE <pipe_name>
AUTO_INGEST = true
INTEGRATION = '<integration_name>'
AS
<copy_statement>;
条件:
<パイプ名>
は、パイプの識別子です。パイプが作成されるスキーマに対して一意である必要があります。識別子はアルファベットで始まる必要があり、識別子文字列全体が二重引用符で囲まれていない限り、スペースや特殊文字を含めることはできません(例:
"My object"
)。二重引用符で囲まれた識別子も大文字と小文字が区別されます。<統合名>
は、 ステップ1:Snowflakeでクラウドストレージ統合を作成する で作成した通知統合の名前です。コピーステートメント
は、キューファイルからSnowflakeテーブルにデータをロードするために使用される COPY INTO <テーブル> ステートメントです。このステートメントは、パイプのテキスト/定義として機能し、 SHOW PIPES 出力に表示されます。
たとえば、 snowpipe_db.public
スキーマにパイプを作成し、 mystage
という外部(GCS)ステージにステージングされたファイルから、 mytable
という宛先テーブルにデータをロードします。
CREATE PIPE snowpipe_db.public.mypipe
AUTO_INGEST = true
INTEGRATION = 'MYINT'
AS
COPY INTO snowpipe_db.public.mytable
FROM @snowpipe_db.public.mystage/path2;
重要
パイプ定義のステージ参照を既存のパイプと比較します。同じ GCS バケットのディレクトリパスが重複していないことを確認します。そうしないと、複数のパイプが同じデータファイルのセットを複数回、1つ以上のターゲットテーブルにロードする可能性があります。これは、たとえば、複数のステージが gcs://mybucket1/path1
や gcs://mybucket1/path1/path2
などの、異なるレベルの細分性で同じ GCS バケットを参照する場合に発生する可能性があります。このユースケースでは、ファイルが gcs://mybucket1/path1/path2
にステージングされている場合、両方のステージのパイプがファイルのコピーをロードします。
これは、Snowpipeの手動設定(自動インジェストは 無効)とは異なり、ユーザーがファイルの名前付きセットを REST API に送信して、ロードするファイルをキューに入れる必要があります。自動インジェストを有効にすると、各パイプはPub/Subメッセージから生成されたファイルリストを受け取ります。データの重複を避けるため、特に注意が必要です。
自動インジェストを使用したSnowpipeが構成されました。
新しいデータファイルが GCS バケットに追加されると、イベントメッセージはSnowpipeに通知して、パイプで定義されたターゲットテーブルにそれらをロードします。
ステップ4:履歴ファイルをロードする¶
Pub/Subメッセージが構成される 前 に外部ステージに存在したデータファイルのバックログをロードするには、 ALTER PIPE ... REFRESH ステートメントを実行します。
SYSTEM$PIPE_STATUS 出力¶
SYSTEM$PIPE_STATUS 関数は、パイプの現在のステータスの JSON 表現を取得します。
TRUE に設定された AUTO_INGEST のパイプの場合、関数は次の名前/値のペアを含む JSON オブジェクトを返します(現在のパイプステータスに該当する場合)。
{"executionState":"<value>","oldestFileTimestamp":<value>,"pendingFileCount":<value>,"notificationChannelName":"<value>","numOutstandingMessagesOnChannel":<value>,"lastReceivedMessageTimestamp":"<value>","lastForwardedMessageTimestamp":"<value>","error":<value>,"fault":<value>}
条件:
executionState
パイプの現在の実行状態は、次のいずれかです。
RUNNING
(つまり、すべてが正常です。Snowflakeはこのパイプのファイルをアクティブに処理する場合としない場合があります)
STOPPED_FEATURE_DISABLED
STOPPED_STAGE_DROPPED
STOPPED_FILE_FORMAT_DROPPED
STOPPED_MISSING_PIPE
STOPPED_MISSING_TABLE
STALLED_COMPILATION_ERROR
STALLED_INITIALIZATION_ERROR
STALLED_EXECUTION_ERROR
STALLED_INTERNAL_ERROR
PAUSED
PAUSED_BY_SNOWFLAKE_ADMIN
PAUSED_BY_ACCOUNT_ADMIN
oldestFileTimestamp
現在キューに入れられているデータファイルの中で最も早いタイムスタンプ(該当する場合)。ファイルがキューに追加されるときにタイムスタンプが設定されます。
pendingFileCount
現在パイプによって処理されているファイルの数。パイプが一時停止されている場合、パイプが一時停止される 前に キューに入れられたファイルが処理されるため、この値は減少します。この値が
0
の場合、このパイプのキューに入れられているファイルがないか、パイプが事実上一時停止しています。notificationChannelName
パイプに関連付けられた GCS ストレージキュー。
numOutstandingMessagesOnChannel
キューに入れられたがまだ受信されていないストレージキュー内のメッセージの数。
lastReceivedMessageTimestamp
ストレージキューから受信した最後のメッセージのタイムスタンプ。このメッセージが特定のパイプに適用されない場合があります(例:メッセージに関連付けられたパス/プレフィックスが、パイプ定義のパス/プレフィックスと一致しない場合)。また、作成されたデータオブジェクトによってトリガーされたメッセージのみが、自動インジェストパイプによって消費されます。
lastForwardedMessageTimestamp
パイプに転送された一致するパス/プレフィックスを持つ最後の「オブジェクトの作成」イベントメッセージのタイムスタンプ。
error
パイプが実行のために最後にコンパイルされたときに生成されたエラーメッセージ(該当する場合)。多くの場合、権限の問題またはオブジェクトの削除により、必要なオブジェクト(テーブル、ステージ、ファイル形式)へのアクセスに問題が生じたことが原因です。
fault
最新の内部Snowflakeプロセスエラー(該当する場合)。主にSnowflakeがデバッグ目的で使用します。