Microsoft Azure Event GridのSnowpipeエラー通知の有効化

このトピックでは、Snowpipeのエラー通知を Microsoft Azure Event Grid (Event Grid)にプッシュする手順について説明します。

この機能は、次に挙げる型のロードのエラー通知をプッシュできます。

  • 自動インジェストSnowpipe。

  • Snowpipe insertFiles REST API エンドポイントへの呼び出し。

  • Kafka用Snowflakeコネクタを使用したApache KafkaとSnowpipeインジェスチョンメソッド併用のロード限定。

このトピックの内容:

クラウドプラットフォームのサポート

現在この機能は、Microsoft AzureでホストされているSnowflakeアカウントに限定されています。Snowpipeは、サポートされているクラウドストレージサービスのファイルからデータを読み込むことができます。Event Gridへのプッシュ通知は、AzureでホストされているSnowflakeアカウントでのみサポートされます

メモ

  • Snowflakeは、エラー通知のメッセージ配信が最低1回あることを保証します(つまり、最低1回の試行が成功することを保証するためにメッセージ配信が複数回試行されることにより、メッセージが重複する可能性があります)。

  • この機能は、通知統合オブジェクトを使用して実装されます。通知統合は、Snowflakeとサードパーティのクラウドメッセージキューサービス間のインターフェイスを提供するSnowflakeオブジェクトです。単一の通知統合で複数のパイプをサポートできます。

エラー通知の有効化

ステップ1: カスタムEvent Gridトピックを作成する

Event Gridトピックは、ソースがイベント通知を送信するエンドポイントを提供します。Snowflakeによって公開されたエラー通知を受信するための専用トピックを作成します。単一のトピックを使用して、Snowflakeアカウントのすべてのパイプ(Snowpipeエラー通知)またはタスク(タスクエラー通知)のエラー通知を受け取ることができます。

Event Gridトピックの作成手順については、 Event Gridのドキュメント をご参照ください。Event Gridトピックエンドポイントを記録します。これは、これらの手順の後半で必要になります。

必要に応じて、トピックをサブスクライブして、追跡するイベントとそれらのイベントの送信先をEvent Gridに通知します。

ステップ2: Snowflakeで通知統合を作成する

テナント ID を取得する

AzureテナントIDを取得します。これは、これらの手順の後半で必要になります。

  1. Microsoft Azureポータルにログインします。

  2. Azure Active Directory » Properties に移動します。後で参照できるように、 Tenant ID 値を記録します。Event GridトピックへのSnowflakeアクセスを許可する同意URLを生成するには、ディレクトリIDまたは テナントID が必要です。

通知統合の作成

CREATE NOTIFICATION INTEGRATION コマンドを使用して統合を作成します。統合は、作成したAzureストレージキューを参照するSnowflakeオブジェクトです。

注釈

このSQLコマンドを実行できるのは、アカウント管理者(ACCOUNTADMINロールを持つユーザー)またはグローバルCREATE INTEGRATION権限を持つロールのみです。

CREATE NOTIFICATION INTEGRATION <integration_name>
  ENABLED = true
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AZURE_EVENT_GRID
  DIRECTION = OUTBOUND
  AZURE_EVENT_GRID_TOPIC_ENDPOINT = '<event_grid_topic_endpoint>'
  AZURE_TENANT_ID = '<azure_tenant_id>'
Copy

例:

CREATE NOTIFICATION INTEGRATION myint
  ENABLED = true
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AZURE_EVENT_GRID
  DIRECTION = OUTBOUND
  AZURE_EVENT_GRID_TOPIC_ENDPOINT = 'https://myaccount.region-1.eventgrid.azure.net/api/events'
  AZURE_TENANT_ID = 'mytenantid';
Copy

条件:

  • event_grid_topic_endpoint は、セクションのステップ1で記録したEvent Gridトピックエンドポイントです。

  • azure_tenant_id は、このセクションの前半で記録したAzureディレクトリ ID またはテナント ID です。

トピックへのSnowflakeアクセスの許可

  1. DESCRIBE INTEGRATION コマンドを実行して、同意 URL を取得します。

    DESC NOTIFICATION INTEGRATION <integration_name>;
    
    Copy

    条件:

    次の列の値に注意します。

    AZURE_CONSENT_URL

    Microsoftのアクセス許可リクエストページへの URL。

    AZURE_MULTI_TENANT_APP_NAME

    アカウント用に作成されたSnowflakeクライアントアプリケーションの名前。このセクションの後半のステップでは、このアプリケーションに、許可されたトピックでアクセストークンを取得するために必要な権限を付与する必要があります。

  2. ウェブブラウザーで AZURE_CONSENT_URL 列にある URL に移動します。このページには、Microsoft許可リクエストページが表示されます。

  3. Accept ボタンをクリックします。このアクションにより、Snowflakeアカウント用に作成されたAzureサービスプリンシパルは、テナント内の指定されたリソースでアクセストークンを取得できるようになります。アクセストークンの取得は、コンテナーに対する適切なアクセス許可をサービスプリンシパルに付与した場合にのみ成功します(次のステップを参照)。

    Microsoftの権限リクエストページは、Snowflakeの企業サイト(snowflake.com)にリダイレクトされます。

  4. Microsoft Azureポータルにログインします。

  5. Azure Active Directory » Enterprise applications に移動します。このセクションのステップ2で記録した、Snowflakeアプリケーション識別子がリストされていることを確認します。

    重要

    Azure Active DirectoryでSnowflakeアプリケーションを後で削除すると、通知統合が機能しなくなります。

  6. Event Grid Topics » topic_name に移動します。ここで、 topic_name は、イベント通知を受信するために作成したトピックの名前です。

  7. Access Control (IAM) » Add role assignment をクリックします。

  8. Snowflakeサービスプリンシパルを検索します。これは DESC NOTIFICATION INTEGRATION 出力(ステップ1)の AZURE_MULTI_TENANT_APP_NAME プロパティにあるIDです。AZURE_MULTI_TENANT_APP_NAME プロパティで、アンダースコアの 前にある 文字列を検索します。

    重要

    • このセクションのMicrosoftリクエストページでリクエストされたSnowflakeサービスプリンシパルをAzureが作成するのに、1時間以上かかる場合があります。サービスプリンシパルがすぐに利用できない場合は、1~2時間待ってから、もう一度検索することをお勧めします。

    • サービスプリンシパルを削除すると、通知統合が機能しなくなります。

  9. Snowflakeアプリケーションに EventGridデータ送信者 権限を付与します。

ステップ3: パイプでエラー通知を有効にする

単一の通知統合は、複数のパイプで共有できます。エラーメッセージの本文は、パイプ、外部ステージとパス、およびエラーが発生したファイルなどの詳細を識別します。

パイプのエラー通知を有効にするには、 ERROR_INTEGRATION パラメーター値を指定します。

注釈

通知統合を参照するパイプを作成または変更するには、通知統合に対する USAGE 権限を持つロールが必要です。さらに、ロールには、スキーマに対する CREATE PIPE 権限、またはパイプに対する OWNERSHIP 権限のいずれかが必要です。

スキーマ内の任意のオブジェクトを操作するには、親データベースとスキーマに対する USAGE 権限も必要であることに注意してください。

指定された権限のセットを使用してカスタムロールを作成する手順については、 カスタムロールの作成 をご参照ください。

セキュリティ保護可能なオブジェクト に対して SQL アクションを実行するためのロールと権限付与に関する一般的な情報については、 アクセス制御の概要 をご参照ください。

新しいパイプ

CREATE PIPE を使用して新しいパイプを作成します。

CREATE PIPE <name>
  AUTO_INGEST = TRUE
  [ INTEGRATION = '<string>' ]
  ERROR_INTEGRATION = <integration_name>
  AS <copy_statement>
Copy

条件:

ERROR_INTEGRATION = <統合名>

ステップ4: 通知統合を作成する (このトピック内)で作成した通知統合の名前です。

例:

CREATE PIPE mypipe
  AUTO_INGEST = TRUE
  INTEGRATION = 'my_storage_int'
  ERROR_INTEGRATION = my_notification_int
  AS
  COPY INTO mydb.public.mytable
  FROM @mydb.public.mystage;
Copy

既存のパイプ

ALTER PIPE を使用して既存のパイプを変更します。

ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;
Copy

ここで、 <統合名> は、 ステップ4: 通知統合を作成する (このトピック内)で作成した通知統合の名前です。

例:

ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;
Copy

エラー通知のメッセージペイロード

エラーメッセージの本文は、パイプとロード中に発生したエラーを識別します。

以下は、Snowpipeエラーを説明するサンプルのメッセージペイロードです。ペイロードには、1つ以上のエラーメッセージを含めることができます。

{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"azure://myaccount.blob.core.windows.net/mycontainer/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}
Copy

ペイロードの値を処理するには、文字列を JSON オブジェクトに解析する必要があることに注意してください。