Amazon SNS のSnowpipeエラー通知の有効化

このトピックでは、Snowpipeエラー通知を Amazon Simple Notification Service (SNS)サービスにプッシュする手順について説明します。SNS は、パブリッシュ/サブスクライブメッセージングサービスです。

この機能は、次に挙げる型のロードのエラー通知をプッシュできます。

  • 自動インジェストSnowpipe。

  • Snowpipe insertFiles REST API エンドポイントへの呼び出し。

  • Kafka用Snowflakeコネクタを使用したApache KafkaとSnowpipeインジェスチョンメソッド併用のロード限定。

このトピックの内容:

クラウドプラットフォームのサポート

現在この機能は、Amazon Web Services(AWS)でホストされているSnowflakeアカウントに限定されています。Snowpipeは、サポートされているクラウドストレージサービスのファイルからデータを読み込むことができます。ただし、SNSへのプッシュ通知は、AWSでホストされているSnowflakeアカウントでのみサポートされます。

メモ

  • この機能は、通知統合オブジェクトを使用して実装されます。通知統合は、Snowflakeとサードパーティのクラウドメッセージキューサービス間のインターフェイスを提供するSnowflakeオブジェクトです。単一の通知統合で複数のパイプをサポートできます。

  • Snowflakeは、エラー通知のメッセージ配信が最低1回あることを保証します(つまり、最低1回の試行が成功することを保証するためにメッセージ配信が複数回試行されることにより、メッセージが重複する可能性があります)。

エラー通知の有効化

ステップ1: Amazon SNS トピックの作成

AWS アカウントに SNS トピックを作成して、すべてのエラーメッセージを処理します。SNS トピックのAmazonリソースネーム(ARN)を記録します。

注釈

SNS の標準トピックのみをサポートしています。エラー通知で使用する SNS FIFO (先入れ先出し)トピックは作成しないでください。現在、 FIFO トピックに送信されたエラー通知は警告なしで失敗します。

リージョン 間で通知を送信する際に遅延を低減し、 データエグレス の料金を回避するには、Snowflakeアカウントと同じリージョンに SNS トピックを作成することをお勧めします。

手順については、 SNS ドキュメントの Amazon SNS トピック の作成をご参照ください。

ステップ2: IAM ポリシーを作成する

SNS トピックに公開するための権限を付与する、 AWS Identity and Access Management(IAM)ポリシーを作成します。ポリシーは、次のアクションを定義します。

  • sns:publish

    SNS トピックに公開します。

  1. AWS 管理コンソールにログインします。

  2. ホームダッシュボードから、 Identity & Access Management (IAM)を選択します。

  3. 左側のナビゲーションペインから Account settings を選択します。

  4. Security Token Service Regions リストを展開し、アカウントがある 地域 に対応する AWS 地域を見つけ、ステータスが Inactive の場合は Activate を選択します。

  5. 左側のナビゲーションペインから Policies を選択します。

  6. Create Policy をクリックします。

  7. JSON タブをクリックします。

  8. SNS トピックで実行できるアクションを定義するポリシードキュメントを追加します。

    次のテキストをコピーしてポリシーエディターに貼り付けます。

    {
        "Version": "2012-10-17",
        "Statement": [
          {
             "Effect": "Allow",
             "Action": [
                 "sns:Publish"
             ],
             "Resource": "<sns_topic_arn>"
          }
        ]
     }
    
    Copy

    sns_topic_arnステップ1: Amazon SNS トピックを作成する (このトピック内)で作成した SNS トピックの ARN に置き換えます。

  9. Review policy をクリックします。

  10. ポリシー名(例: snowflake_sns_topic)とオプションの説明を入力します。 Create policy をクリックします。

ステップ3: AWS IAM ロールを作成する

SNS トピックに権限を割り当てる AWS IAM ロールを作成します。

  1. AWS 管理コンソールにログインします。

  2. ホームダッシュボードから、 Identity & Access Management (IAM)を選択します。

  3. 左側のナビゲーションペインから Roles を選択します。

  4. Create role ボタンをクリックします。

  5. 信頼できるエンティティタイプとして Another AWS account を選択します。

  6. Account ID フィールドに、自分の AWS アカウントID を一時的に入力します。

  7. Require external ID オプションを選択します。このオプションを使用すると、Amazonアカウントリソース(つまり SNS)に対するアクセス許可をサードパーティ(つまりSnowflake)に付与できます。

    今は、 0000 などのダミー ID を入力します。後で、信頼関係を変更し、アカウント用に生成されたSnowflake IAM ユーザーのダミー ID を外部 ID に置き換えます。IAM ロールの信頼ポリシーの条件により、Snowflakeユーザーは、後で作成する通知統合オブジェクトを使用してロールを引き受けることができます。

  8. Next ボタンをクリックします。

  9. ステップ2: IAM ポリシーを作成する (このトピック内)で作成したポリシーを検索し、このポリシーを選択します。

  10. Next ボタンをクリックします。

  11. ロールの名前と説明を入力し、 Create role ボタンをクリックします。

  12. ロールの概要ページにある Role ARN 値を記録します。この値は、後にある1つ以上のステップで指定します。

ステップ4: 通知統合を作成する

CREATE NOTIFICATION INTEGRATION を使用して、通知統合を作成します。統合は、作成した SNS トピックを参照するSnowflakeオブジェクトです。

単一の通知統合で複数の タスクパイプ をサポートできます。

注釈

この SQL コマンドを実行できるのは、アカウント管理者(ACCOUNTADMIN ロールを持つユーザー)または CREATE INTEGRATION グローバル権限を持つロールのみです。

CREATE NOTIFICATION INTEGRATION <integration_name>
  ENABLED = true
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AWS_SNS
  DIRECTION = OUTBOUND
  AWS_SNS_TOPIC_ARN = '<topic_arn>'
  AWS_SNS_ROLE_ARN = '<iam_role_arn>'
Copy

条件:

<統合名>

新しい統合の名前。

DIRECTION = OUTBOUND

Snowflakeに関するクラウドメッセージングの方向。エラー通知を構成する場合にのみ必要です。

<トピックのARN>

ステップ1: Amazon SNS トピックで作成する (このトピック内)で記録した SNS トピックの ARN。

<IAMロールのARN>

ステップ3: AWS IAM ロールを作成する (このトピック内)で記録した IAM ロールの ARN。

例:

CREATE NOTIFICATION INTEGRATION my_notification_int
  ENABLED = true
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AWS_SNS
  DIRECTION = OUTBOUND
  AWS_SNS_TOPIC_ARN = 'arn:aws:sns:us-east-2:111122223333:sns_topic'
  AWS_SNS_ROLE_ARN = 'arn:aws:iam::111122223333:role/error_sns_role';
Copy

ステップ5: SNS トピックにSnowflakeアクセスを許可する

IAM ユーザー ARN と SNS トピック外部 ID の取得

  1. DESCRIBE INTEGRATION を実行します。

    DESC NOTIFICATION INTEGRATION <integration_name>;
    
    Copy

    条件:

    • integration_name は、 ステップ4: 通知統合を作成する (このトピック内)で作成した通知統合の名前です。

    例:

    DESC NOTIFICATION INTEGRATION my_notification_int;
    
    +---------------------------+-------------------+------------------------------------------------------+----------------------+
    |   property                |   property_type   |   property_value                                     |   property_default   |
    +---------------------------+-------------------+------------------------------------------------------+----------------------+
    |   ENABLED                 |   Boolean         |   true                                               |   false              |
    |   NOTIFICATION_PROVIDER   |   String          |   AWS_SNS                                            |                      |
    |   DIRECTION               |   String          |   OUTBOUND                                           |   INBOUND            |
    |   AWS_SNS_TOPIC_ARN       |   String          |   arn:aws:sns:us-east-2:111122223333:myaccount       |                      |
    |   AWS_SNS_ROLE_ARN        |   String          |   arn:aws:iam::111122223333:role/myrole              |                      |
    |   SF_AWS_IAM_USER_ARN     |   String          |   arn:aws:iam::123456789001:user/c_myaccount         |                      |
    |   SF_AWS_EXTERNAL_ID      |   String          |   MYACCOUNT_SFCRole=2_a123456/s0aBCDEfGHIJklmNoPq=   |                      |
    +---------------------------+-------------------+------------------------------------------------------+----------------------+
    
    Copy
  2. 生成された次の値を記録します。

    SF_AWS_IAM_USER_ARN

    アカウント用に作成されたSnowflake IAM ユーザーの ARN。Snowflakeアカウントのユーザーは、通知統合を使用してこのユーザーの外部 ID を送信することにより、 ステップ3: AWS IAM ロールを作成する で作成した IAM ロールを引き受けます。

    SF_AWS_EXTERNAL_ID

    使用するアカウント用に作成されたSnowflake IAM ユーザーの外部 ID。

    次のステップでは、 IAM ロールの信頼関係をこれらの値で更新します。

DIRECTION プロパティは、Snowflakeに関するクラウドメッセージングの方向を示していることに注意してください。

IAM ロールの信頼関係の変更

  1. AWS 管理コンソールにログインします。

  2. ホームダッシュボードから、 Identity & Access Management (IAM)を選択します。

  3. 左側のナビゲーションペインから Roles を選択します。

  4. ステップ3: AWS IAM ロールを作成する (このトピック内)で作成したロールをクリックします。

  5. Trust relationships タブをクリックします。

  6. Edit trust relationship ボタンをクリックします。

  7. IAM ユーザーの ARN および SNS トピックの外部 ID (このトピック内)で記録した DESC NOTIFICATION INTEGRATION 出力値でポリシードキュメントを変更します。

    IAM ロールのポリシードキュメント

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "",
          "Effect": "Allow",
          "Principal": {
            "AWS": "<sf_aws_iam_user_arn>"
          },
          "Action": "sts:AssumeRole",
          "Condition": {
            "StringEquals": {
              "sts:ExternalId": "<sf_aws_external_id>"
            }
          }
        }
      ]
    }
    
    Copy

    条件:

    • sf_aws_iam_user_arn は、記録した SF_AWS_IAM_USER_ARN 値です。

    • sf_aws_external_id は、記録した SF_AWS_EXTERNAL_ID 値です。

  8. Update Trust Policy ボタンをクリックします。変更が保存されます。

ステップ6: パイプでエラー通知を有効にする

単一の通知統合は、複数のパイプで共有できます。エラーメッセージの本文は、パイプ、外部ステージとパス、およびエラーが発生したファイルなどの詳細を識別します。

パイプのエラー通知を有効にするには、 ERROR_INTEGRATION パラメーター値を指定します。

注釈

通知統合を参照するパイプを作成または変更するには、通知統合に対する USAGE 権限を持つロールが必要です。さらに、ロールには、スキーマに対する CREATE PIPE 権限、またはパイプに対する OWNERSHIP 権限のいずれかが必要です。

スキーマ内の任意のオブジェクトを操作するには、親データベースとスキーマに対する USAGE 権限も必要であることに注意してください。

指定された権限のセットを使用してカスタムロールを作成する手順については、 カスタムロールの作成 をご参照ください。

セキュリティ保護可能なオブジェクト に対して SQL アクションを実行するためのロールと権限付与に関する一般的な情報については、 アクセス制御の概要 をご参照ください。

新しいパイプ

CREATE PIPE を使用して新しいパイプを作成します。自動データロードの構成(つまり、自動インジェストSnowpipe)には、追加のパイプパラメーターが必要であることに注意してください。手順については、 クラウドメッセージングを使用した連続データロードの自動化 をご参照ください。

CREATE PIPE <name>
  [ AUTO_INGEST = TRUE | FALSE  ]
  ERROR_INTEGRATION = <integration_name>
  AS <copy_statement>
Copy

条件:

ERROR_INTEGRATION = <統合名>

ステップ4: 通知統合を作成する (このトピック内)で作成した通知統合の名前です。

次の例は、エラー通知と自動データロードの両方をサポートする CREATE PIPE ステートメントを示しています。

CREATE PIPE mypipe
  AUTO_INGEST = TRUE
  ERROR_INTEGRATION = my_notification_int
  AS
  COPY INTO mydb.public.mytable
  FROM @mydb.public.mystage;
Copy

既存のパイプ

ALTER PIPE を使用して既存のパイプを変更します。

ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;
Copy

ここで、 <統合名> は、 ステップ4: 通知統合を作成する (このトピック内)で作成した通知統合の名前です。

例:

ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;
Copy

エラー通知のメッセージペイロード

エラーメッセージの本文は、パイプとロード中に発生したエラーを識別します。

以下は、Snowpipeエラーを説明するサンプルのメッセージペイロードです。ペイロードには、1つ以上のエラーメッセージを含めることができます。

{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"s3://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}
Copy

ペイロードの値を処理するには、文字列を JSON オブジェクトに解析する必要があることに注意してください。