Google Pub/SubのSnowpipeエラー通知の有効化

このトピックでは、Snowpipeエラー通知を Google Cloud Pub/Sub (Pub/Sub)サービスにプッシュする手順について説明します。

この機能は、次に挙げる型のロードのエラー通知をプッシュできます。

  • 自動インジェストSnowpipe。

  • Snowpipe insertFiles REST API エンドポイントへの呼び出し。

  • Kafka用Snowflakeコネクタを使用したApache KafkaとSnowpipeインジェスチョンメソッド併用のロード限定。

このトピックの内容:

クラウドプラットフォームのサポート

この機能は、Google Cloud Platform(GCP)にホストされているSnowflakeアカウントに制限されています。Snowpipeは、サポートされているクラウドストレージサービスのファイルからデータを読み込むことができます。ただし、Pub/Subへのプッシュ通知は、GCPでホストされているSnowflakeアカウントでのみサポートされます。

メモ

  • Snowflakeは、エラー通知のメッセージ配信が最低1回あることを保証します(つまり、最低1回の試行が成功することを保証するためにメッセージ配信が複数回試行されることにより、メッセージが重複する可能性があります)。

  • この機能は、通知統合オブジェクトを使用して実装されます。通知統合は、Snowflakeとサードパーティのクラウドメッセージキューサービス間のインターフェイスを提供するSnowflakeオブジェクトです。単一の通知統合で複数のパイプをサポートできます。

エラー通知の有効化

ステップ1: Pub/Subトピックを作成する

Snowflakeからエラー通知メッセージを受信できるPub/Subトピックを作成するか、既存のトピックを再利用します。トピックは、 クラウドシェル または クラウドSDK を使用して作成できます。詳細については、Pub/Subドキュメントの トピックの作成と使用 をご参照ください。

例えば、次のコマンドを実行して、空のトピックを作成します。

$ gsutil notification create -t <topic>
Copy

トピックがすでに存在する場合、コマンドはそれを使用します。それ以外の場合は、新しいトピックが作成されます。

ステップ2: Pub/Subサブスクリプションを作成する

必要に応じて、Pub/Subトピックへのサブスクリプションを作成して、エラー通知を取得します。Cloud Console、 gcloud コマンドラインツール、またはCloud Pub/Sub API を使用して、プル配信があるサブスクリプションを作成することができます。手順については、Pub/Subドキュメントの トピックとサブスクリプションの管理 をご参照ください。

ステップ3: Snowflakeで通知統合を作成する

CREATE NOTIFICATION INTEGRATION コマンドを使用して通知統合を作成します。通知統合は、Pub/Subトピックを参照します。Snowflakeは、通知統合をアカウント用に作成されたGoogle Cloud Platform(GCP)サービスアカウントに関連付けます。Snowflakeは、Snowflakeアカウントのすべての GCP 通知統合によって参照される単一のサービスアカウントを作成します。

注釈

  • このSQLコマンドを実行できるのは、アカウント管理者(ACCOUNTADMINロールを持つユーザー)またはグローバルCREATE INTEGRATION権限を持つロールのみです。

  • 通知統合用の GCP サービスアカウントは、ストレージ統合用に作成されたサービスアカウントとは異なります。

CREATE NOTIFICATION INTEGRATION <integration_name>
  ENABLED = TRUE
  TYPE = QUEUE
  DIRECTION = OUTBOUND
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  GCP_PUBSUB_TOPIC_NAME = '<topic_id>'
Copy

条件:

  • integration_name は、新しい統合の名前です。

  • topic_id は、Snowflakeがエラー通知を送信するPub/Subトピックです。詳細については、 ステップ1: Pub/Subトピックを作成する (このトピック内)をご参照ください。

例:

CREATE NOTIFICATION INTEGRATION my_notification_int
  TYPE = QUEUE
  DIRECTION = OUTBOUND
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_TOPIC_NAME = 'projects/sdm-prod/topics/mytopic';
Copy

ステップ4: Pub/SubサブスクリプションにSnowflakeアクセスを許可する

  1. DESCRIBE INTEGRATION コマンドを実行して、Snowflakeサービスアカウント ID を取得します。

    DESC NOTIFICATION INTEGRATION <integration_name>;
    
    Copy

    条件:

    • integration_name は、「ステップ1: Snowflakeで通知統合を作成する」で作成した統合の名前です。

    例:

    DESC NOTIFICATION INTEGRATION my_notification_int;
    
    Copy
  2. GCP_PUBSUB_SERVICE_ACCOUNT 列にサービスアカウント名を記録します。これは次の形式です。

    <service_account>@<project_id>.iam.gserviceaccount.com
    
    Copy
  3. プロジェクトエディターとしてGoogle Cloud Platform Consoleにログインします。

  4. ホームダッシュボードから、 Big Data » Pub/Sub » Subscriptions を選択します。

  5. アクセス用に構成するサブスクリプションを選択します。

  6. 右上隅の SHOW INFO PANEL をクリックします。サブスクリプションの情報パネルがスライドアウトします。

  7. Add members フィールドで、記録したサービスアカウント名を検索します。

  8. Select a role ドロップダウンから、 Pub/Sub Publisher を選択します。

  9. Add ボタンをクリックします。サービスアカウント名は、情報パネルの Pub/Sub Publisher ロールドロップダウンに追加されます。

ステップ5: パイプでエラー通知を有効にする

単一の通知統合は、複数のパイプで共有できます。エラーメッセージの本文は、パイプ、外部ステージとパス、およびエラーが発生したファイルなどの詳細を識別します。

パイプのエラー通知を有効にするには、 ERROR_INTEGRATION パラメーター値を指定します。

注釈

通知統合を参照するパイプを作成または変更するには、通知統合に対する USAGE 権限を持つロールが必要です。さらに、ロールには、スキーマに対する CREATE PIPE 権限、またはパイプに対する OWNERSHIP 権限のいずれかが必要です。

スキーマ内の任意のオブジェクトを操作するには、親データベースとスキーマに対する USAGE 権限も必要であることに注意してください。

指定された権限のセットを使用してカスタムロールを作成する手順については、 カスタムロールの作成 をご参照ください。

セキュリティ保護可能なオブジェクト に対して SQL アクションを実行するためのロールと権限付与に関する一般的な情報については、 アクセス制御の概要 をご参照ください。

新しいパイプ

CREATE PIPE を使用して新しいパイプを作成します。

CREATE PIPE <name>
  AUTO_INGEST = TRUE
  [ INTEGRATION = '<string>' ]
  ERROR_INTEGRATION = <integration_name>
  AS <copy_statement>
Copy

条件:

ERROR_INTEGRATION = <統合名>

ステップ4: 通知統合を作成する (このトピック内)で作成した通知統合の名前です。

例:

CREATE PIPE mypipe
  AUTO_INGEST = TRUE
  INTEGRATION = 'my_storage_int'
  ERROR_INTEGRATION = my_notification_int
  AS
  COPY INTO mydb.public.mytable
  FROM @mydb.public.mystage;
Copy

既存のパイプ

ALTER PIPE を使用して既存のパイプを変更します。

注釈

パイプの作成時に通知統合が指定された場合は、最初にERROR_INTEGRATIONパラメーターの 設定を解除 (ALTER PIPE ... UNSET ERROR_INTEGRATIONを使用)してからパラメーターを設定する必要があります。

ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;
Copy

ここで、 <統合名> は、 ステップ4: 通知統合を作成する (このトピック内)で作成した通知統合の名前です。

例:

ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;
Copy

エラー通知のメッセージペイロード

エラーメッセージの本文は、パイプとロード中に発生したエラーを識別します。

以下は、Snowpipeエラーを説明するサンプルのメッセージペイロードです。ペイロードには、1つ以上のエラーメッセージを含めることができます。

{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"gcs://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}
Copy

ペイロードの値を処理するには、文字列を JSON オブジェクトに解析する必要があることに注意してください。