Google Pub/SubのSnowpipeエラー通知の有効化¶
このトピックでは、Snowpipeエラー通知を Google Cloud Pub/Sub (Pub/Sub)サービスにプッシュする手順について説明します。
この機能は、次に挙げる型のロードのエラー通知をプッシュできます。
自動インジェストSnowpipe。
Snowpipe
insertFiles
REST API エンドポイントへの呼び出し。Kafka用Snowflakeコネクタを使用したApache KafkaとSnowpipeインジェスチョンメソッド併用のロード限定。
このトピックの内容:
クラウドプラットフォームのサポート¶
この機能は、Google Cloud Platform(GCP)にホストされているSnowflakeアカウントに制限されています。Snowpipeは、サポートされているクラウドストレージサービスのファイルからデータを読み込むことができます。ただし、Pub/Subへのプッシュ通知は、GCPでホストされているSnowflakeアカウントでのみサポートされます。
メモ¶
Snowflakeは、エラー通知のメッセージ配信が最低1回あることを保証します(つまり、最低1回の試行が成功することを保証するためにメッセージ配信が複数回試行されることにより、メッセージが重複する可能性があります)。
この機能は、通知統合オブジェクトを使用して実装されます。通知統合は、Snowflakeとサードパーティのクラウドメッセージキューサービス間のインターフェイスを提供するSnowflakeオブジェクトです。単一の通知統合で複数のパイプをサポートできます。
エラー通知の有効化¶
ステップ1: Pub/Subトピックを作成する¶
Snowflakeからエラー通知メッセージを受信できるPub/Subトピックを作成するか、既存のトピックを再利用します。トピックは、 クラウドシェル または クラウドSDK を使用して作成できます。詳細については、Pub/Subドキュメントの トピックの作成と使用 をご参照ください。
例えば、次のコマンドを実行して、空のトピックを作成します。
$ gsutil notification create -t <topic>
トピックがすでに存在する場合、コマンドはそれを使用します。それ以外の場合は、新しいトピックが作成されます。
ステップ2: Pub/Subサブスクリプションを作成する¶
必要に応じて、Pub/Subトピックへのサブスクリプションを作成して、エラー通知を取得します。Cloud Console、 gcloud
コマンドラインツール、またはCloud Pub/Sub API を使用して、プル配信があるサブスクリプションを作成することができます。手順については、Pub/Subドキュメントの トピックとサブスクリプションの管理 をご参照ください。
ステップ3: Snowflakeで通知統合を作成する¶
CREATE NOTIFICATION INTEGRATION コマンドを使用して通知統合を作成します。通知統合は、Pub/Subトピックを参照します。Snowflakeは、通知統合をアカウント用に作成されたGoogle Cloud Platform(GCP)サービスアカウントに関連付けます。Snowflakeは、Snowflakeアカウントのすべての GCP 通知統合によって参照される単一のサービスアカウントを作成します。
注釈
このSQLコマンドを実行できるのは、アカウント管理者(ACCOUNTADMINロールを持つユーザー)またはグローバルCREATE INTEGRATION権限を持つロールのみです。
通知統合用の GCP サービスアカウントは、ストレージ統合用に作成されたサービスアカウントとは異なります。
CREATE NOTIFICATION INTEGRATION <integration_name>
ENABLED = TRUE
TYPE = QUEUE
DIRECTION = OUTBOUND
NOTIFICATION_PROVIDER = GCP_PUBSUB
GCP_PUBSUB_TOPIC_NAME = '<topic_id>'
条件:
integration_name
は、新しい統合の名前です。topic_id
は、Snowflakeがエラー通知を送信するPub/Subトピックです。詳細については、 ステップ1: Pub/Subトピックを作成する (このトピック内)をご参照ください。
例:
CREATE NOTIFICATION INTEGRATION my_notification_int
TYPE = QUEUE
DIRECTION = OUTBOUND
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = true
GCP_PUBSUB_TOPIC_NAME = 'projects/sdm-prod/topics/mytopic';
ステップ4: Pub/SubサブスクリプションにSnowflakeアクセスを許可する¶
DESCRIBE INTEGRATION コマンドを実行して、Snowflakeサービスアカウント ID を取得します。
DESC NOTIFICATION INTEGRATION <integration_name>;
条件:
integration_name
は、「ステップ1: Snowflakeで通知統合を作成する」で作成した統合の名前です。
例:
DESC NOTIFICATION INTEGRATION my_notification_int;
GCP_PUBSUB_SERVICE_ACCOUNT 列にサービスアカウント名を記録します。これは次の形式です。
<service_account>@<project_id>.iam.gserviceaccount.com
プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。
ホームダッシュボードから、 Big Data » Pub/Sub » Subscriptions を選択します。
アクセス用に構成するサブスクリプションを選択します。
右上隅の SHOW INFO PANEL をクリックします。サブスクリプションの情報パネルがスライドアウトします。
Add members フィールドで、記録したサービスアカウント名を検索します。
Select a role ドロップダウンから、 Pub/Sub Publisher を選択します。
Add ボタンをクリックします。サービスアカウント名は、情報パネルの Pub/Sub Publisher ロールドロップダウンに追加されます。
ステップ5: パイプでエラー通知を有効にする¶
単一の通知統合は、複数のパイプで共有できます。エラーメッセージの本文は、パイプ、外部ステージとパス、およびエラーが発生したファイルなどの詳細を識別します。
パイプのエラー通知を有効にするには、 ERROR_INTEGRATION パラメーター値を指定します。
注釈
通知統合を参照するパイプを作成または変更するには、通知統合に対する USAGE 権限を持つロールが必要です。さらに、ロールには、スキーマに対する CREATE PIPE 権限、またはパイプに対する OWNERSHIP 権限のいずれかが必要です。
スキーマ内の任意のオブジェクトを操作するには、親データベースとスキーマに対する USAGE 権限も必要であることに注意してください。
指定された権限のセットを使用してカスタムロールを作成する手順については、 カスタムロールの作成 をご参照ください。
セキュリティ保護可能なオブジェクト に対して SQL アクションを実行するためのロールと権限付与に関する一般的な情報については、 アクセス制御の概要 をご参照ください。
新しいパイプ¶
CREATE PIPE を使用して新しいパイプを作成します。
CREATE PIPE <name>
AUTO_INGEST = TRUE
[ INTEGRATION = '<string>' ]
ERROR_INTEGRATION = <integration_name>
AS <copy_statement>
条件:
ERROR_INTEGRATION = <統合名>
ステップ4: 通知統合を作成する (このトピック内)で作成した通知統合の名前です。
例:
CREATE PIPE mypipe
AUTO_INGEST = TRUE
INTEGRATION = 'my_storage_int'
ERROR_INTEGRATION = my_notification_int
AS
COPY INTO mydb.public.mytable
FROM @mydb.public.mystage;
既存のパイプ¶
ALTER PIPE を使用して既存のパイプを変更します。
注釈
パイプの作成時に通知統合が指定された場合は、最初にERROR_INTEGRATIONパラメーターの 設定を解除 (ALTER PIPE ... UNSET ERROR_INTEGRATIONを使用)してからパラメーターを設定する必要があります。
ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;
ここで、 <統合名>
は、 ステップ4: 通知統合を作成する (このトピック内)で作成した通知統合の名前です。
例:
ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;
エラー通知のメッセージペイロード¶
エラーメッセージの本文は、パイプとロード中に発生したエラーを識別します。
以下は、Snowpipeエラーを説明するサンプルのメッセージペイロードです。ペイロードには、1つ以上のエラーメッセージを含めることができます。
{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"gcs://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}
ペイロードの値を処理するには、文字列を JSON オブジェクトに解析する必要があることに注意してください。