Microsoft Azure Event GridのSnowpipeエラー通知の有効化¶
このトピックでは、Snowpipeのエラー通知を Microsoft Azure Event Grid (Event Grid)にプッシュする手順について説明します。
この機能は、次に挙げる型のロードのエラー通知をプッシュできます。
自動インジェストSnowpipe。
Snowpipe
insertFiles
REST API エンドポイントへの呼び出し。Kafka用Snowflakeコネクタを使用したApache KafkaとSnowpipeインジェスチョンメソッド併用のロード限定。
このトピックの内容:
クラウドプラットフォームのサポート¶
現在この機能は、Microsoft AzureでホストされているSnowflakeアカウントに限定されています。Snowpipeは、サポートされているクラウドストレージサービスのファイルからデータを読み込むことができます。Event Gridへのプッシュ通知は、AzureでホストされているSnowflakeアカウントでのみサポートされます
メモ¶
Snowflakeは、エラー通知のメッセージ配信が最低1回あることを保証します(つまり、最低1回の試行が成功することを保証するためにメッセージ配信が複数回試行されることにより、メッセージが重複する可能性があります)。
この機能は、通知統合オブジェクトを使用して実装されます。通知統合は、Snowflakeとサードパーティのクラウドメッセージキューサービス間のインターフェイスを提供するSnowflakeオブジェクトです。単一の通知統合で複数のパイプをサポートできます。
エラー通知の有効化¶
ステップ1: カスタムEvent Gridトピックを作成する¶
Event Gridトピックは、ソースがイベント通知を送信するエンドポイントを提供します。Snowflakeによって公開されたエラー通知を受信するための専用トピックを作成します。単一のトピックを使用して、Snowflakeアカウントのすべてのパイプ(Snowpipeエラー通知)またはタスク(タスクエラー通知)のエラー通知を受け取ることができます。
Event Gridトピックの作成手順については、 Event Gridのドキュメント をご参照ください。Event Gridトピックエンドポイントを記録します。これは、これらの手順の後半で必要になります。
必要に応じて、トピックをサブスクライブして、追跡するイベントとそれらのイベントの送信先をEvent Gridに通知します。
ステップ2: Snowflakeで通知統合を作成する¶
テナント ID を取得する¶
AzureテナントIDを取得します。これは、これらの手順の後半で必要になります。
Microsoft Azureポータルにログインします。
Azure Active Directory » Properties に移動します。後で参照できるように、 Tenant ID 値を記録します。Event GridトピックへのSnowflakeアクセスを許可する同意URLを生成するには、ディレクトリIDまたは テナントID が必要です。
通知統合の作成¶
CREATE NOTIFICATION INTEGRATION コマンドを使用して統合を作成します。統合は、作成したAzureストレージキューを参照するSnowflakeオブジェクトです。
注釈
このSQLコマンドを実行できるのは、アカウント管理者(ACCOUNTADMINロールを持つユーザー)またはグローバルCREATE INTEGRATION権限を持つロールのみです。
CREATE NOTIFICATION INTEGRATION <integration_name>
ENABLED = true
TYPE = QUEUE
NOTIFICATION_PROVIDER = AZURE_EVENT_GRID
DIRECTION = OUTBOUND
AZURE_EVENT_GRID_TOPIC_ENDPOINT = '<event_grid_topic_endpoint>'
AZURE_TENANT_ID = '<azure_tenant_id>'
例:
CREATE NOTIFICATION INTEGRATION myint
ENABLED = true
TYPE = QUEUE
NOTIFICATION_PROVIDER = AZURE_EVENT_GRID
DIRECTION = OUTBOUND
AZURE_EVENT_GRID_TOPIC_ENDPOINT = 'https://myaccount.region-1.eventgrid.azure.net/api/events'
AZURE_TENANT_ID = 'mytenantid';
条件:
event_grid_topic_endpoint
は、セクションのステップ1で記録したEvent Gridトピックエンドポイントです。azure_tenant_id
は、このセクションの前半で記録したAzureディレクトリ ID またはテナント ID です。
トピックへのSnowflakeアクセスの許可¶
DESCRIBE INTEGRATION コマンドを実行して、同意 URL を取得します。
DESC NOTIFICATION INTEGRATION <integration_name>;
条件:
integration_name
は、 通知統合を作成する で作成した統合の名前です。
次の列の値に注意します。
- AZURE_CONSENT_URL:
Microsoftのアクセス許可リクエストページへの URL。
- AZURE_MULTI_TENANT_APP_NAME:
アカウント用に作成されたSnowflakeクライアントアプリケーションの名前。このセクションの後半のステップでは、このアプリケーションに、許可されたトピックでアクセストークンを取得するために必要な権限を付与する必要があります。
ウェブブラウザーで AZURE_CONSENT_URL 列にある URL に移動します。このページには、Microsoft許可リクエストページが表示されます。
Accept ボタンをクリックします。このアクションにより、Snowflakeアカウント用に作成されたAzureサービスプリンシパルは、テナント内の指定されたリソースでアクセストークンを取得できるようになります。アクセストークンの取得は、コンテナーに対する適切なアクセス許可をサービスプリンシパルに付与した場合にのみ成功します(次のステップを参照)。
Microsoftの権限リクエストページは、Snowflakeの企業サイト(snowflake.com)にリダイレクトされます。
Microsoft Azureポータルにログインします。
Azure Active Directory » Enterprise applications に移動します。このセクションのステップ2で記録した、Snowflakeアプリケーション識別子がリストされていることを確認します。
重要
Azure Active DirectoryでSnowflakeアプリケーションを後で削除すると、通知統合が機能しなくなります。
Event Grid Topics »
topic_name
に移動します。ここで、topic_name
は、イベント通知を受信するために作成したトピックの名前です。Access Control (IAM) » Add role assignment をクリックします。
Snowflakeサービスプリンシパルを検索します。これは DESC NOTIFICATION INTEGRATION 出力(ステップ1)の AZURE_MULTI_TENANT_APP_NAME プロパティにあるIDです。AZURE_MULTI_TENANT_APP_NAME プロパティで、アンダースコアの 前にある 文字列を検索します。
重要
このセクションのMicrosoftリクエストページでリクエストされたSnowflakeサービスプリンシパルをAzureが作成するのに、1時間以上かかる場合があります。サービスプリンシパルがすぐに利用できない場合は、1~2時間待ってから、もう一度検索することをお勧めします。
サービスプリンシパルを削除すると、通知統合が機能しなくなります。
Snowflakeアプリケーションに EventGridデータ送信者 権限を付与します。
ステップ3: パイプでエラー通知を有効にする¶
単一の通知統合は、複数のパイプで共有できます。エラーメッセージの本文は、パイプ、外部ステージとパス、およびエラーが発生したファイルなどの詳細を識別します。
パイプのエラー通知を有効にするには、 ERROR_INTEGRATION パラメーター値を指定します。
注釈
通知統合を参照するパイプを作成または変更するには、通知統合に対する USAGE 権限を持つロールが必要です。さらに、ロールには、スキーマに対する CREATE PIPE 権限、またはパイプに対する OWNERSHIP 権限のいずれかが必要です。
スキーマ内の任意のオブジェクトを操作するには、親データベースとスキーマに対する USAGE 権限も必要であることに注意してください。
指定された権限のセットを使用してカスタムロールを作成する手順については、 カスタムロールの作成 をご参照ください。
セキュリティ保護可能なオブジェクト に対して SQL アクションを実行するためのロールと権限付与に関する一般的な情報については、 アクセス制御の概要 をご参照ください。
新しいパイプ¶
CREATE PIPE を使用して新しいパイプを作成します。
CREATE PIPE <name>
AUTO_INGEST = TRUE
[ INTEGRATION = '<string>' ]
ERROR_INTEGRATION = <integration_name>
AS <copy_statement>
条件:
ERROR_INTEGRATION = <統合名>
ステップ4: 通知統合を作成する (このトピック内)で作成した通知統合の名前です。
例:
CREATE PIPE mypipe
AUTO_INGEST = TRUE
INTEGRATION = 'my_storage_int'
ERROR_INTEGRATION = my_notification_int
AS
COPY INTO mydb.public.mytable
FROM @mydb.public.mystage;
既存のパイプ¶
ALTER PIPE を使用して既存のパイプを変更します。
ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;
ここで、 <統合名>
は、 ステップ4: 通知統合を作成する (このトピック内)で作成した通知統合の名前です。
例:
ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;
エラー通知のメッセージペイロード¶
エラーメッセージの本文は、パイプとロード中に発生したエラーを識別します。
以下は、Snowpipeエラーを説明するサンプルのメッセージペイロードです。ペイロードには、1つ以上のエラーメッセージを含めることができます。
{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"azure://myaccount.blob.core.windows.net/mycontainer/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}
ペイロードの値を処理するには、文字列を JSON オブジェクトに解析する必要があることに注意してください。