Amazon S3用Snowpipeの自動化

このトピックでは、S3バケットの Amazon SQS (Simple Queue Service) 通知を使用して、Snowpipeデータのロードを自動的にトリガーする手順を説明します。

Snowflakeは、コスト、イベントノイズ、遅延を低減するために、Snowpipeでサポートされているイベントのみを送信することをお勧めします。

このトピックの内容:

クラウドプラットフォームのサポート

S3イベントメッセージを使用した自動Snowpipeデータロードのトリガーは、 サポートされているすべてのクラウドプラットフォーム でホストされているSnowflakeアカウントでサポートされています。

ネットワークトラフィック

Virtual Private Snowflake(VPS) および AWS PrivateLink のお客様へのノート:

Amazon SQS 通知を使用したSnowpipeの自動化は正常に機能します。ただし、 VPC ( VPS を含む)内の AWS クラウドストレージは独自のメッセージングサービス(Amazon SQS、Amazon Simple Notification Service)と通信できますが、このトラフィックは、 VPC 外のAmazonの安全なネットワーク上のサーバー間を流れます。したがって、このトラフィックは VPC によって保護されません。

クラウドストレージへの安全なアクセスの構成

注釈

データファイルを格納するS3バケットへの安全なアクセスをすでに構成している場合は、このセクションをスキップできます。

このセクションでは、ストレージ統合を使用して、Snowflakeが外部(つまり、S3)ステージで参照されるAmazon S3バケットに対してデータを読み書きできるようにする方法について説明します。統合は、名前付きのファーストクラスのSnowflakeオブジェクトであり、秘密キーまたはアクセストークンといった、クラウドプロバイダーの明示的な認証情報を渡す必要がありません。統合オブジェクトには、 AWS IDおよびアクセス管理(IAM)ユーザー IDが保存されます。組織の管理者が、 AWS アカウントの統合 IAM ユーザー権限を付与します。

統合では、統合を使用する外部ステージを作成するときにユーザーが指定できる場所を制限するバケット(およびオプションのパス)もリストできます。

注釈

  • このセクションの手順を完了するには、 IAM ポリシーおよびロールの作成および管理のための AWS の権限が必要です。AWS 管理者でない場合は、 AWS 管理者にこれらのタスクを実行するように依頼します。

  • 現在、ストレージ統合を使用した 政府リージョン のS3ストレージへのアクセスは、同じ政府リージョンの AWS でホストされているSnowflakeアカウントに限定されていることに注意してください。直接認証情報を使用した、政府リージョン外でホストされているアカウントからS3ストレージへのアクセスがサポートされています。

次の図は、S3ステージの統合フローを示しています。

Amazon S3 Stage Integration Flow
  1. 外部(つまり、S3)ステージは、その定義でストレージ統合オブジェクトを参照します。

  2. Snowflakeは、ストレージ統合をアカウント用に作成されたS3 IAM ユーザーに自動的に関連付けます。Snowflakeは、SnowflakeアカウントのすべてのS3ストレージ統合によって参照される単一の IAM ユーザーを作成します。

  3. 組織の AWS 管理者は、 IAM ユーザーにステージ定義で参照されているバケットにアクセスする権限を付与します。多数の外部ステージオブジェクトでは、異なるバケットとパスを参照し、認証に同じストレージ統合を使用できます。

ユーザーがステージに対してデータをロードまたはアンロードすると、Snowflakeは、アクセスを許可または拒否する前に、バケット上の IAM ユーザーに付与された権限を確認します。

注釈

このオプションを強くお勧めします。これにより、クラウドストレージにアクセスするときに IAM 認証情報を提供する必要がなくなります。その他のストレージアクセスオプションについては Amazon S3へのセキュアアクセスの構成 をご参照ください。

このセクションの内容:

ステップ1:S3バケットのアクセス許可を構成する

AWS アクセス制御の要件

Snowflakeでは、フォルダー(およびサブフォルダー)内のファイルにアクセスできるようにするために、S3バケットおよびフォルダーに対する次の権限が必要です。

  • s3:GetBucketLocation

  • s3:GetObject

  • s3:GetObjectVersion

  • s3:ListBucket

ベストプラクティスとして、Snowflakeは、S3バケットへのSnowflakeアクセス用の IAM ポリシーを作成することをお勧めします。その後、ポリシーをロールに添付し、ロールのために AWS によって生成されたセキュリティ認証情報を使用して、バケット内のファイルにアクセスできます。

IAM ポリシーの作成

次の詳細な手順では、S3バケットにアクセスするために AWS 管理コンソールでSnowflakeのアクセス許可を構成する方法について説明します。

  1. AWS 管理コンソールにログインします。

  2. ホームダッシュボードから、 Identity & Access Management (IAM)を選択します。

    Identity & Access Management in AWS Management Console
  3. 左側のナビゲーションペインから Account settings を選択します。

  4. Security Token Service Regions リストを展開し、アカウントがある 地域 に対応する AWS 地域を見つけ、ステータスが Inactive の場合は Activate を選択します。

  5. 左側のナビゲーションペインから Policies を選択します。

  6. Create Policy をクリックします。

    Create Policy button on Policies page
  7. JSON タブをクリックします。

  8. SnowflakeがS3バケットとフォルダーにアクセスできるようにするポリシードキュメントを追加します。

    次のポリシー( JSON 形式)は、単一のバケットとフォルダーパスを使用してデータをロードまたはアンロードするために必要な権限をSnowflakeに提供します。

    テキストをコピーしてポリシーエディターに貼り付けます。

    注釈

    • bucketprefix を実際のバケット名とフォルダーパスプレフィックスに確実に置き換えます。

    • 政府リージョン のバケットのAmazonリソースネーム(ARN)には、 arn:aws-us-gov:s3::: プレフィックスがあります。

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                  "s3:GetObject",
                  "s3:GetObjectVersion"
                ],
                "Resource": "arn:aws:s3:::<bucket>/<prefix>/*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket",
                    "s3:GetBucketLocation"
                ],
                "Resource": "arn:aws:s3:::<bucket>",
                "Condition": {
                    "StringLike": {
                        "s3:prefix": [
                            "<prefix>/*"
                        ]
                    }
                }
            }
        ]
    }
    
    Copy

    注釈

    "s3:prefix": 条件を ["*"] または ["< パス>/*"] に設定すると、指定されたバケット内のすべてのプレフィックスまたはバケット内のパスへのアクセスがそれぞれ許可されます。

    AWS ポリシーは、さまざまなセキュリティユースケースをサポートします。

  9. Review policy をクリックします。

  10. ポリシー名(例: snowflake_access)とオプションの説明を入力します。 Create policy をクリックします。

    Create Policy button in Review Policy page

ステップ2: IAM AWS ロールを作成する

AWS 管理コンソールで、 AWS IAM ロールを作成して、データファイルを含むS3バケットに対する権限を付与します。

  1. AWS 管理コンソールにログインします。

  2. ホームダッシュボードから、 Identity & Access Management (IAM)を選択します。

    Identity & Access Management in AWS Management Console
  3. 左側のナビゲーションペインから Roles を選択します。

  4. Create role ボタンをクリックします。

    Select Trusted Entity Page in AWS Management Console
  5. 信頼できるエンティティタイプとして Another AWS account を選択します。

  6. Account ID フィールドに、自分の AWS アカウントID を一時的に入力します。後で、信頼関係を変更し、Snowflakeへのアクセスを許可します。

  7. Require external ID オプションを選択します。 0000 などのダミーの ID を入力します。後で信頼関係を変更し、Snowflakeステージの外部 ID を指定します。AWS リソース(つまり、S3)へのアクセスをサードパーティ(つまり、Snowflake)に対して許可するには、外部 ID が必要です。

  8. Next ボタンをクリックします。

  9. ステップ1:S3バケットのアクセス許可を構成する (このトピック内)で作成したポリシーを検索し、このポリシーを選択します。

  10. Next ボタンをクリックします。

    Review Page in AWS Management Console
  11. ロールの名前と説明を入力し、 Create role ボタンをクリックします。

    これで、バケットの IAM ポリシー、および IAM ロールを作成し、ロールにポリシーを添付しました。

  12. 役割の概要ページにある Role ARN 値を記録します。次のステップでは、このロールを参照するSnowflake統合を作成します。

    IAM Role

注釈

Snowflakeは、一時的な認証情報を一定期間キャッシュしますが、60分の有効期限を越えることができません。Snowflakeからのアクセスを取り消すと、ユーザーはキャッシュの有効期限が切れるまでファイルをリストし、クラウドストレージの場所からデータにアクセスできる場合があります。

ステップ3:Snowflakeでクラウドストレージ統合を作成する

CREATE STORAGE INTEGRATION コマンドを使用してストレージ統合を作成します。ストレージ統合は、S3クラウドストレージ用に生成されたIDおよびアクセス管理(IAM)ユーザーと、許可またはブロックされたストレージロケーション(バケットなど)のオプションセットを保存するSnowflakeオブジェクトです。組織のクラウドプロバイダー管理者は、生成されたユーザーにストレージの場所に対するアクセス許可を付与します。このオプションにより、ユーザーはステージの作成時またはデータのロード時に、認証情報の提供を回避できます。

単一のストレージ統合で複数の外部(つまり、S3)ステージをサポートできます。ステージ定義にある URL は、 STORAGE_ALLOWED_LOCATIONS パラメーターのために指定されたS3バケット(およびオプションのパス)に合わせる必要があります。

注釈

この SQL コマンドを実行できるのは、アカウント管理者( ACCOUNTADMIN ロールを持つユーザー)またはグローバル CREATE INTEGRATION 権限を持つロールのみです。

CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<iam_role>'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]
Copy

条件:

  • integration_name は、新しい統合の名前です。

  • iam_role は、 ステップ2: AWS で IAM ロールを作成する (このトピック内)で作成したロールのAmazonリソースネーム(ARN)です。

  • bucket は、データファイルを保存するS3バケットの名前です(例: mybucket)。必須の STORAGE_ALLOWED_LOCATIONS パラメーターおよびオプションの STORAGE_BLOCKED_LOCATIONS パラメーターは、この統合を参照するステージが作成または変更されたときに、それぞれこれらのバケットへのアクセスを制限またはブロックします。

  • path は、バケット内のオブジェクトを細かく制御するために使用できるオプションのパスです。

次の例では、2つのバケットとパスのいずれかを参照するために統合を使用する外部ステージを明示的に制限する統合を作成します。後のステップで、これらのバケットとパスのいずれかを参照する外部ステージを作成します。

この統合も使用する追加の外部ステージは、許可されたバケットとパスを参照できます。

CREATE STORAGE INTEGRATION s3_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::001234567890:role/myrole'
  STORAGE_ALLOWED_LOCATIONS = ('s3://mybucket1/mypath1/', 's3://mybucket2/mypath2/')
  STORAGE_BLOCKED_LOCATIONS = ('s3://mybucket1/mypath1/sensitivedata/', 's3://mybucket2/mypath2/sensitivedata/');
Copy

ステップ4:Snowflakeアカウントの AWS IAM ユーザーを取得する

  1. DESCRIBE INTEGRATION コマンドを実行して、Snowflakeアカウント用に自動的に作成された AWS IAM ユーザーの ARN を取得します。

    DESC INTEGRATION <integration_name>;
    
    Copy

    条件:

    例:

    DESC INTEGRATION s3_int;
    
    +---------------------------+---------------+--------------------------------------------------------------------------------+------------------+
    | property                  | property_type | property_value                                                                 | property_default |
    +---------------------------+---------------+--------------------------------------------------------------------------------+------------------|
    | ENABLED                   | Boolean       | true                                                                           | false            |
    | STORAGE_ALLOWED_LOCATIONS | List          | s3://mybucket1/mypath1/,s3://mybucket2/mypath2/                                | []               |
    | STORAGE_BLOCKED_LOCATIONS | List          | s3://mybucket1/mypath1/sensitivedata/,s3://mybucket2/mypath2/sensitivedata/    | []               |
    | STORAGE_AWS_IAM_USER_ARN  | String        | arn:aws:iam::123456789001:user/abc1-b-self1234                                 |                  |
    | STORAGE_AWS_ROLE_ARN      | String        | arn:aws:iam::001234567890:role/myrole                                          |                  |
    | STORAGE_AWS_EXTERNAL_ID   | String        | MYACCOUNT_SFCRole=2_a123456/s0aBCDEfGHIJklmNoPq=                               |                  |
    +---------------------------+---------------+--------------------------------------------------------------------------------+------------------+
    
    Copy
  2. 次の値を記録します。

    説明

    STORAGE_AWS_IAM_USER_ARN

    Snowflakeアカウント用に作成された AWS IAM ユーザー、この例では arn:aws:iam::123456789001:user/abc1-b-self1234 。Snowflakeアカウント全体用に、単一の IAM ユーザーをプロビジョニングします。すべてのS3ストレージ統合は、その IAM ユーザーを使用します。

    STORAGE_AWS_EXTERNAL_ID

    信頼関係を確立するために必要な外部 ID 。

    これらの値は次のセクションで提供します。

ステップ5:バケットオブジェクトにアクセスするために IAM ユーザー権限を付与する

次の手順では、S3バケットを使用してデータをロードおよびアンロードできるように、 AWS 管理コンソールでSnowflakeの IAM アクセス許可を構成する方法を説明します。

  1. AWS 管理コンソールにログインします。

  2. ホームダッシュボードから、 Identity & Access Management (IAM)を選択します。

    Identity & Access Management in AWS Management Console
  3. 左側のナビゲーションペインから Roles を選択します。

  4. ステップ2: IAM AWS ロールを作成する (このトピック内)で作成したロールをクリックします。

  5. Trust relationships タブをクリックします。

  6. Edit trust relationship ボタンをクリックします。

  7. ステップ4:Snowflakeアカウントの AWS IAM ユーザーを取得する (このトピック内)で記録した DESC STORAGE INTEGRATION 出力値でポリシードキュメントを変更します。

    IAM ロールのポリシードキュメント

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "",
          "Effect": "Allow",
          "Principal": {
            "AWS": "<snowflake_user_arn>"
          },
          "Action": "sts:AssumeRole",
          "Condition": {
            "StringEquals": {
              "sts:ExternalId": "<snowflake_external_id>"
            }
          }
        }
      ]
    }
    
    Copy

    条件:

    • snowflake_user_arn は、記録した STORAGE_AWS_IAM_USER_ARN 値です。

    • snowflake_external_id は、記録した STORAGE_AWS_EXTERNAL_ID 値です。

      この例では、 snowflake_external_id の値は MYACCOUNT_SFCRole=2_a123456/s0aBCDEfGHIJklmNoPq= です。

      注釈

      セキュリティ上の理由から、新しいストレージ統合を作成する(または CREATE OR REPLACE STORAGE INTEGRATION 構文を使用して既存のストレージ統合を再作成する)と、その統合は異なる外部 ID を持つため、信頼ポリシーが変更されなければ信頼関係を解決することはできません。

  8. Update Trust Policy ボタンをクリックします。変更が保存されます。

注釈

Snowflakeは、一時的な認証情報を一定期間キャッシュしますが、60分の有効期限を越えることができません。Snowflakeからのアクセスを取り消すと、ユーザーはキャッシュの有効期限が切れるまでファイルを一覧表示し、クラウドストレージの場所からデータを読み込むことができる場合があります。

正しいオプションの決定

先に進む前に、データファイルが置かれているS3バケットのターゲットパス(または AWS 用語では「プレフィックス」)にS3イベント通知が存在するかどうかを確認します。AWS ルールでは、同じパスに対して競合する通知を作成することを禁止しています。

Amazon SQS を使用してSnowpipeを自動化するための次のオプションがサポートされています。

  • オプション1。新しいS3イベント通知: S3バケットのターゲットパスのイベント通知を作成します。イベント通知は、ファイルをロードする準備ができると、 SQS キューを介してSnowpipeに通知します。

    これが最も一般的なオプションです。

    重要

    S3バケットに競合するイベント通知が存在する場合は、代わりにオプション2を使用します。

  • オプション2。既存のイベント通知: Amazon Simple Notification Service(SNS) をブロードキャスターとして構成し、Snowpipe自動化用のSnowflake SQS キューを含む、複数のエンドポイント(または「サブスクライバー」。例: SQS キューまたは AWS Lambdaワークロード)に通知を共有するようにします。SNS によってパブリッシュされたS3イベント通知は、ファイルのロードする準備が整ったときに SQS キューを介してSnowpipeに通知します。

  • オプション3。Snowpipeを自動化するためのAmazon EventBridge の設定: オプション2と同様に、S3バケットに対して Amazon EventBridge を有効にして、 SNS トピックに通知を送信するルールを作成することもできます。

オプション1:Snowpipeを自動化するために、新しいS3イベント通知を作成する

このセクションでは、S3バケットの Amazon SQS (Simple Queue Service) 通知を使用して、Snowpipeデータのロードを自動的にトリガーするための最も一般的なオプションについて説明します。手順では、データファイルが保存されているS3バケットのターゲットパス(または AWS の用語では「プレフィックス」)のイベント通知を作成する方法を説明します。

重要

S3バケットに対して競合するイベント通知が存在する場合は、代わりに オプション2: SQS 通知を使用してSnowpipeを自動化するために、Amazon SNS を構成する (このトピック内)を使用します。AWS ルールでは、同じターゲットパスに対して競合する通知の作成を禁じています。

次の図は、Snowpipeの自動インジェストプロセスフローを示しています。

Snowpipe Auto-ingest Process Flow
  1. データファイルはステージにロードされます。

  2. S3イベント通知は、SQS キューを介してSnowpipeにファイルをロードする準備ができたことを通知します。Snowpipeはファイルをキューにコピーします。

  3. Snowflakeが提供する仮想ウェアハウスは、指定されたパイプで定義されたパラメーターに基づいて、キューに入れられたファイルからターゲットテーブルにデータをロードします。

注釈

このトピックの手順では、データがロードされるSnowflakeデータベースにターゲットテーブルが既に存在することを前提としています。

ステップ1:ステージを作成する(必要な場合)

CREATE STAGE コマンドを使用して、S3バケットを参照する外部ステージを作成します。Snowpipeはステージからデータファイルをフェッチし、ターゲットテーブルにロードする前に一時的にキューに入れます。または、既存の外部ステージを使用できます。

注釈

  • クラウドストレージの場所への安全なアクセスを構成するには、 クラウドストレージへの安全なアクセスの構成 (このトピック内)をご参照ください。

  • CREATE STAGE ステートメントでストレージ統合を参照するには、ロールにストレージ統合オブジェクトに対する USAGE 権限が必要です。

次の例では、ユーザーセッションのアクティブスキーマに mystage という名前のステージを作成します。クラウドストレージ URL にはパス files が含まれています。ステージは my_storage_int という名前のストレージ統合を参照します。

USE SCHEMA snowpipe_db.public;

CREATE STAGE mystage
  URL = 's3://mybucket/load/files'
  STORAGE_INTEGRATION = my_storage_int;
Copy

ステップ2: 自動インジェストを有効にしたパイプを作成する

CREATE PIPE コマンドを使用してパイプを作成します。パイプは、Snowpipeがインジェスションキューからターゲットテーブルにデータをロードするために使用する COPY INTO <テーブル> ステートメントを定義します。

次の例では、ユーザーセッションのアクティブスキーマに mypipe という名前のパイプを作成します。パイプは、 mystage ステージでステージングされたファイルから mytable テーブルにデータをロードします。

create pipe snowpipe_db.public.mypipe auto_ingest=true as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');
Copy

AUTO_INGEST=true パラメーターは、新しいデータをロードする準備ができたときに、S3バケットから SQS キューに送信されたイベント通知を読み取ることを指定します。

重要

パイプ定義のステージ参照を既存のパイプと比較します。同じS3バケットのディレクトリパスが重複していないことを確認します。そうしないと、複数のパイプが同じデータファイルのセットを複数回、1つ以上のターゲットテーブルにロードする可能性があります。これは、たとえば、複数のステージが s3://mybucket/path1s3://mybucket/path1/path2 などの異なるレベルの細分性で同じS3バケットを参照する場合に発生する可能性があります。このユースケースでは、ファイルが s3://mybucket/path1/path2 にステージングされている場合、両方のステージのパイプがファイルのコピーをロードします。

これは、Snowpipeの手動設定(自動インジェスト 無効)とは異なり、ユーザーがファイルの名前付きセットを REST API に送信して、ロードするファイルをキューに入れる必要があります。自動インジェストを有効にすると、各パイプはS3イベント通知から生成されたファイルリストを受け取ります。データの重複を避けるために、追加の注意が必要です。

ステップ3: セキュリティを構成する

Snowpipeを使用して連続的なデータロードを実行するユーザーごとに、データロードのオブジェクト(ターゲットデータベース、スキーマ、テーブル、ステージオブジェクト、パイプ)に十分なアクセス制御権限を付与します。

注釈

「最小権限」の一般原則に従うために、パイプを使用してファイルを取り込むために使用する別個のユーザーとロールを作成することをお勧めします。ユーザーは、このロールを既定のロールとして作成する必要があります。

Snowpipeを使用するには、次の権限を持つロールが必要です。

オブジェクト

権限

メモ

名前付きパイプ

OWNERSHIP

名前付きステージ

USAGE , READ

名前付きファイル形式

USAGE

オプション: ステップ1:ステージを作成する(必要な場合) で作成したステージが名前付きファイル形式を参照する場合にのみ必要です。

ターゲットデータベース

USAGE

ターゲットスキーマ

USAGE

ターゲットテーブル

INSERT , SELECT

GRANT <権限> コマンドを使用して、ロールに権限を付与します。

注釈

ロールを作成して権限を付与できるのは、セキュリティ管理者(つまり、 SECURITYADMIN ロールを持つユーザー)以上、またはアカウントの CREATE ROLE 権限とグローバル MANAGE GRANTS 権限の両方を持つ別のロールのみです。

たとえば、 snowpipe_db.public データベースオブジェクトのセットと mypipe という名前のパイプにアクセスできる snowpipe_role という名前のロールを作成します。次に、ロールをユーザーに付与します。

-- Create a role to contain the Snowpipe privileges
use role securityadmin;

create or replace role snowpipe_role;

-- Grant the required privileges on the database objects
grant usage on database snowpipe_db to role snowpipe_role;

grant usage on schema snowpipe_db.public to role snowpipe_role;

grant insert, select on snowpipe_db.public.mytable to role snowpipe_role;

grant usage on stage snowpipe_db.public.mystage to role snowpipe_role;

-- Pause the pipe for OWNERSHIP transfer
alter pipe mypipe SET PIPE_EXECUTION_PAUSED = true;

-- Grant the OWNERSHIP privilege on the pipe object
grant ownership on pipe snowpipe_db.public.mypipe to role snowpipe_role;

-- Grant the role to a user
grant role snowpipe_role to user jsmith;

-- Set the role as the default role for the user
alter user jsmith set default_role = snowpipe_role;

-- Resume the pipe
alter pipe mypipe SET PIPE_EXECUTION_PAUSED = false;
Copy

ステップ4:イベント通知を構成する

S3バケットのイベント通知を構成して、新しいデータをロードできるようになったときにSnowpipeに通知します。自動インジェスト機能は、S3からSnowpipeにイベント通知を配信するために SQS キューに依存しています。

使いやすくするために、Snowpipe SQS キューはSnowflakeによって作成および管理されます。SHOW PIPES コマンド出力には、 SQS キューのAmazonリソースネーム(ARN)が表示されます。

  1. SHOW PIPES コマンドを実行します。

    SHOW PIPES;
    
    Copy

    notification_channel 列のステージには、 SQS キューの ARN があります。 ARN を便利な場所にコピーします。

    注釈

    AWS ガイドラインに従って、SnowflakeはS3バケットごとに1つ以下の SQS キューを指定します。この SQS キューは、同じ AWS アカウントの複数のバケットで共有できます。 SQS キューは、S3バケットの外部ステージをターゲットテーブルに接続するすべてのパイプの通知を調整します。データファイルがバケットにアップロードされると、ステージディレクトリパスに一致するすべてのパイプが、対応するターゲットテーブルへのファイルの1回限りのロードを実行します。

  2. Amazon S3コンソールにログインします。

  3. Amazon S3ドキュメント に記載されている手順を使用して、S3バケットのイベント通知を構成します。次のようにフィールドに入力します。

    • Name :イベント通知の名前(例: Auto-ingest Snowflake)。

    • EventsObjectCreate (All) オプションを選択します。

    • Send to :ドロップダウンリストから SQS Queue を選択します。

    • SQS :ドロップダウンリストから Add SQS queue ARN を選択します。

    • SQS queue ARN : SHOW PIPES 出力から SQS キュー名を貼り付けます。

注釈

これらの手順は、S3バケット全体のアクティビティをモニターする単一のイベント通知を作成します。これが最も簡単なアプローチです。この通知は、S3バケットディレクトリでより詳細なレベルで構成されたすべてのパイプを処理します。Snowpipeは、パイプ定義で指定されたデータファイルのみを読み込みます。ただし、パイプ定義外のアクティビティに関する大量の通知は、Snowpipeが通知をフィルタリングしてアクションを実行するレートに悪影響を与える可能性があります。

または、上記の手順で、1つ以上のパスおよび/またはファイル拡張子(または AWS の用語では プレフィックス および サフィックス)を構成して、イベントアクティビティをフィルターします。手順については、関連する AWS ドキュメントトピック のオブジェクトキー名フィルタリング情報をご参照ください。通知でモニターする追加のパスまたはファイル拡張子ごとにこれらのステップを繰り返します。

AWS では、これらの通知 キュー構成 の数がS3バケットごとに最大100に制限されます。

また、 AWS では、同じS3バケットのキュー構成の重複(イベント通知全体)が許可されません。たとえば、既存の通知が s3://mybucket/load/path1 に構成されている場合、 s3://mybucket/load などの上位レベルで別の通知を作成することはできません。その逆も同様です。

自動インジェストを使用したSnowpipeが構成されました。

新しいデータファイルがS3バケットに追加されると、イベント通知はSnowpipeに通知して、それらをパイプで定義されたターゲットテーブルにロードします。

ステップ5:履歴ファイルをロードする

SQS 通知が構成される 前に 外部ステージに存在したデータファイルのバックログをロードするには、 履歴データのロード をご参照ください。

ステップ6: ステージングされたファイルを削除する

データが正常にロードされ、ファイルが不要になったら、ステージングされたファイルを削除します。手順については、 Snowpipeがデータをロードした後のステージングされたファイルの削除 をご参照ください。

オプション2: SQS 通知を使用してSnowpipeを自動化するために、Amazon SNS を構成する

このセクションでは、S3バケットの Amazon SQS (Simple Queue Service) 通知を使用して、Snowpipeデータのロードを自動的にトリガーする方法について説明します。この手順では、 Amazon Simple Notification Service(SNS) をブロードキャスターとして構成して、S3バケットのイベント通知を複数のサブスクライバー(例: SQS キューまたは AWS Lambdaワークロード)にパブリッシュする方法を説明します。サブスクライバーには、Snowpipe自動化用のSnowflake SQS キューを含みます。

注釈

これらの手順は、データファイルが置かれているS3バケットのターゲットパスにイベント通知が存在することを前提としています。イベント通知が存在しない場合は、次のいずれかを実行します。

次の図は、Amazon SNSを使用したSnowpipeの自動インジェストのプロセスフローを示しています。

Snowpipe Auto-ingest Process Flow with Amazon SNS
  1. データファイルはステージにロードされます。

  2. SNS によってパブリッシュされたS3イベント通知は、ファイルがロード可能な状態であることを SQS キュー経由でSnowpipeに通知します。Snowpipeはファイルをキューにコピーします。

  3. Snowflakeが提供する仮想ウェアハウスは、指定されたパイプで定義されたパラメーターに基づいて、キューに入れられたファイルからターゲットテーブルにデータをロードします。

注釈

この手順では、データがロードされるSnowflakeデータベースにターゲットテーブルが既に存在することを前提としています。

Snowpipeの自動インジェストは、 AWS KMSで暗号化された SNS トピックをサポートしています。詳細については、 休眠中の暗号化 をご参照ください。

前提条件:Amazon SNS トピックとサブスクリプションを作成する

  1. AWS アカウントで SNS トピックを作成して、S3バケットのSnowflakeステージロケーションのすべてのメッセージを処理します。

  2. S3イベント通知(例:他の SQS キューまたは AWS Lambdaワークロード)のターゲットの宛先をこのトピックにサブスクライブします。SNS バケットのイベント通知をトピックのすべてのサブスクライバーにパブリッシュします。

手順については、 SNS ドキュメント をご参照ください。

ステップ1:Snowflake SQS キューを SNS トピックに登録する

  1. AWS 管理コンソールにログインします。

  2. ホームダッシュボードから、 Simple Notification Service (SNS)を選択します。

  3. 左側のナビゲーションペインから Topics を選択します。

  4. S3バケットのトピックを見つけます。トピック ARN に注意します。

  5. Snowflakeクライアントを使用して、 SNS トピック ARNで SYSTEM$GET_AWS_SNS_IAM_POLICY システム関数をクエリします。

    select system$get_aws_sns_iam_policy('<sns_topic_arn>');
    
    Copy

    この関数は、 SNS トピックをサブスクライブするSnowflake SQS キュー許可を付与する IAM ポリシーを返します。

    例:

    select system$get_aws_sns_iam_policy('arn:aws:sns:us-west-2:001234567890:s3_mybucket');
    
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | SYSTEM$GET_AWS_SNS_IAM_POLICY('ARN:AWS:SNS:US-WEST-2:001234567890:S3_MYBUCKET')                                                                                                                                                                   |
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | {"Version":"2012-10-17","Statement":[{"Sid":"1","Effect":"Allow","Principal":{"AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"},"Action":["sns:Subscribe"],"Resource":["arn:aws:sns:us-west-2:001234567890:s3_mybucket"]}]}                 |
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    
    Copy
  6. AWS 管理コンソールに戻ります。左側のナビゲーションペインから Topics を選択します。

  7. S3バケットのトピックを選択し、 Edit ボタンをクリックします。 Edit ページが開きます。

  8. Access policy - Optional をクリックして、ページのこの領域を展開します。

  9. SYSTEM$GET_AWS_SNS_IAM_POLICY 関数の結果から追加された IAM ポリシーを JSON ドキュメントにマージします。

    例:

    元の IAM ポリシー(略称):

    {
      "Version":"2008-10-17",
      "Id":"__default_policy_ID",
      "Statement":[
         {
            "Sid":"__default_statement_ID",
            "Effect":"Allow",
            "Principal":{
               "AWS":"*"
            }
            ..
         }
       ]
     }
    
    Copy

    統合された IAM ポリシー:

    {
      "Version":"2008-10-17",
      "Id":"__default_policy_ID",
      "Statement":[
         {
            "Sid":"__default_statement_ID",
            "Effect":"Allow",
            "Principal":{
               "AWS":"*"
            }
            ..
         },
         {
            "Sid":"1",
            "Effect":"Allow",
            "Principal":{
              "AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"
             },
             "Action":[
               "sns:Subscribe"
             ],
             "Resource":[
               "arn:aws:sns:us-west-2:001234567890:s3_mybucket"
             ]
         }
       ]
     }
    
    Copy
  10. 追加のポリシー許可を追加して、S3がバケットのイベント通知を SNS トピックに公開できるようにします。

    例(これらの手順全体で使用される SNS トピック ARN およびS3バケットを使用):

    {
        "Sid":"s3-event-notifier",
        "Effect":"Allow",
        "Principal":{
           "Service":"s3.amazonaws.com"
        },
        "Action":"SNS:Publish",
        "Resource":"arn:aws:sns:us-west-2:001234567890:s3_mybucket",
        "Condition":{
           "ArnLike":{
              "aws:SourceArn":"arn:aws:s3:*:*:s3_mybucket"
           }
        }
     }
    
    Copy

    統合された IAM ポリシー:

    {
      "Version":"2008-10-17",
      "Id":"__default_policy_ID",
      "Statement":[
         {
            "Sid":"__default_statement_ID",
            "Effect":"Allow",
            "Principal":{
               "AWS":"*"
            }
            ..
         },
         {
            "Sid":"1",
            "Effect":"Allow",
            "Principal":{
              "AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"
             },
             "Action":[
               "sns:Subscribe"
             ],
             "Resource":[
               "arn:aws:sns:us-west-2:001234567890:s3_mybucket"
             ]
         },
         {
            "Sid":"s3-event-notifier",
            "Effect":"Allow",
            "Principal":{
               "Service":"s3.amazonaws.com"
            },
            "Action":"SNS:Publish",
            "Resource":"arn:aws:sns:us-west-2:001234567890:s3_mybucket",
            "Condition":{
               "ArnLike":{
                  "aws:SourceArn":"arn:aws:s3:*:*:s3_mybucket"
               }
            }
          }
       ]
     }
    
    Copy
  11. Save changes ボタンをクリックします。

ステップ2: ステージを作成する(必要な場合)

CREATE STAGE コマンドを使用して、S3バケットを参照する外部ステージを作成します。Snowpipeはステージからデータファイルをフェッチし、ターゲットテーブルにロードする前に一時的にキューに入れます。

または、既存の外部ステージを使用できます。

注釈

クラウドストレージの場所への安全なアクセスを構成するには、 クラウドストレージへの安全なアクセスの構成 (このトピック内)をご参照ください。

次の例では、ユーザーセッションのアクティブスキーマに mystage という名前のステージを作成します。クラウドストレージ URL にはパス files が含まれています。ステージは my_storage_int という名前のストレージ統合を参照します。

CREATE STAGE mystage
  URL = 's3://mybucket/load/files'
  STORAGE_INTEGRATION = my_storage_int;
Copy

ステップ3:自動インジェストを有効にしたパイプを作成する

CREATE PIPE コマンドを使用してパイプを作成します。パイプは、Snowpipeがインジェスションキューからターゲットテーブルにデータをロードするために使用する COPY INTO <テーブル> ステートメントを定義します。COPY ステートメントで、 前提条件:Amazon SNS とサブスクリプションを作成する から SNS トピック ARN を識別します。

次の例では、ユーザーセッションのアクティブスキーマに mypipe という名前のパイプを作成します。パイプは、 mystage ステージでステージングされたファイルから mytable テーブルにデータをロードします。

create pipe snowpipe_db.public.mypipe
  auto_ingest=true
  aws_sns_topic='<sns_topic_arn>'
  as
    copy into snowpipe_db.public.mytable
    from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');
Copy

条件:

AUTO_INGEST = true

新しいデータをロードする準備ができたときに、S3バケットから SQS キューに送信されたイベント通知を読み取ることを指定します。

AWS_SNS_TOPIC = '<SNSトピックARN>'

現在の例 arn:aws:sns:us-west-2:001234567890:s3_mybucket のように、S3バケットの SNS トピックの ARN を指定します。CREATE PIPE ステートメントは、指定された SNS トピックにSnowflake SQS キューをサブスクライブします。パイプは、 SNS トピックを介したイベント通知によってトリガーされたインジェストキューにのみファイルをコピーします。

パイプからいずれかのパラメーターを削除するには、現在、 CREATE OR REPLACE PIPE 構文を使用してパイプを再作成する必要があります。

重要

COPY INTO <テーブル> ステートメントの保存場所の参照が、アカウントにある既存のパイプの参照と重複していないことを確認します。そうしないと、複数のパイプが、同じデータファイルのセットをターゲットテーブルにロードする可能性があります。たとえば、この状況は、複数のパイプ定義が <ストレージの場所>/path1/<ストレージの場所>/path1/path2/ など、細分性のレベルが異なる同じストレージの場所を参照している場合に発生する可能性があります。この例では、ファイルが <ストレージの場所>/path1/path2/ でステージングされている場合は、両方のパイプがファイルのコピーをロードします。

SHOW PIPES を実行するか、Account Usageの PIPES ビューまたはInformation Schemaの PIPES ビューのいずれかをクエリして、アカウントにあるパイプの定義すべての COPY INTO <テーブル> ステートメントを表示します。

ステップ4: セキュリティを構成する

Snowpipeを使用して連続的なデータロードを実行するユーザーごとに、データロードのオブジェクト(ターゲットデータベース、スキーマ、テーブル、ステージオブジェクト、パイプ)に十分なアクセス制御権限を付与します。

注釈

「最小権限」の一般原則に従うために、パイプを使用してファイルを取り込むために使用する別個のユーザーとロールを作成することをお勧めします。ユーザーは、このロールを既定のロールとして作成する必要があります。

Snowpipeを使用するには、次の権限を持つロールが必要です。

オブジェクト

権限

メモ

名前付きパイプ

OWNERSHIP

名前付きストレージ統合

USAGE

ステップ2:ステージを作成する(必要な場合) で作成したステージがステージ統合を参照する場合にのみ必要です。

名前付きステージ

USAGE , READ

名前付きファイル形式

USAGE

オプション: ステップ2:ステージを作成する(必要な場合) で作成したステージが名前付きファイル形式を参照する場合にのみ必要です。

ターゲットデータベース

USAGE

ターゲットスキーマ

USAGE

ターゲットテーブル

INSERT , SELECT

GRANT <権限> コマンドを使用して、ロールに権限を付与します。

注釈

セキュリティ管理者(つまり、 SECURITYADMIN ロールを持つユーザー)以上のみがロールを作成できます。

たとえば、 snowpipe_db.public データベースオブジェクトのセットと mypipe という名前のパイプにアクセスできる snowpipe_role という名前のロールを作成します。次に、ロールをユーザーに付与します。

-- Create a role to contain the Snowpipe privileges
use role securityadmin;

create or replace role snowpipe_role;

-- Grant the required privileges on the database objects
grant usage on database snowpipe_db to role snowpipe_role;

grant usage on schema snowpipe_db.public to role snowpipe_role;

grant insert, select on snowpipe_db.public.mytable to role snowpipe_role;

grant usage, read on stage snowpipe_db.public.mystage to role snowpipe_role;

-- Pause the pipe for OWNERSHIP transfer
alter pipe mypipe SET PIPE_EXECUTION_PAUSED = true;

-- Grant the OWNERSHIP privilege on the pipe object
grant ownership on pipe snowpipe_db.public.mypipe to role snowpipe_role;

-- Grant the role to a user
grant role snowpipe_role to user jsmith;

-- Set the role as the default role for the user
alter user jsmith set default_role = snowpipe_role;

-- Resume the pipe
alter pipe mypipe SET PIPE_EXECUTION_PAUSED = false;
Copy

自動インジェストを使用したSnowpipeが構成されました。

新しいデータファイルがS3バケットに追加されると、イベント通知はSnowpipeに通知して、それらをパイプで定義されたターゲットテーブルにロードします。

ステップ5:履歴ファイルをロードする

SQS 通知が構成される 前に 外部ステージに存在したデータファイルのバックログをロードするには、 履歴データのロード をご参照ください。

ステップ6: ステージングされたファイルを削除する

データが正常にロードされ、ファイルが不要になったら、ステージングされたファイルを削除します。手順については、 Snowpipeがデータをロードした後のステージングされたファイルの削除 をご参照ください。

オプション3: Snowpipeを自動化するためにAmazon EventBridge を設定する

オプション2と同様に、Amazon EventBridge を設定してSnowpipeを自動化することもできます。

ステップ1: Amazon SNS トピックを作成する

前提条件: Amazon SNS トピックとサブスクリプションを作成する (このトピック内)に従います。

ステップ2: EventBridge ルールを作成してS3バケットをサブスクライブし、 SNS トピックに通知を送信します

  • S3バケットの Amazon EventBridge を有効にします。

  • EventBridge ルールを作成し、ステップ1で作成した SNS トピックに 通知を送信 します。

ステップ3: SQS 通知を使用してSnowpipeを自動化するためにAmazon SNS を構成する

オプション2: SQS 通知を使用してSnowpipeを自動化するために、Amazon SNS を構成する (このトピック内)に従います。

SYSTEM$PIPE_STATUS 出力

SYSTEM$PIPE_STATUS 関数は、パイプの現在のステータスの JSON 表現を取得します。

TRUEに設定された AUTO_INGEST のパイプの場合、関数は次の名前/値のペアを含む JSON オブジェクトを返します(現在のパイプステータスに該当する場合)。

{"executionState":"<値>","oldestFileTimestamp":<値>,"pendingFileCount":<値>,"notificationChannelName":"<値>","numOutstandingMessagesOnChannel":<値値>,"lastReceivedMessageTimestamp":"<値>","lastForwardedMessageTimestamp":"<値>","error":<値>,"fault":<値>}

出力値の説明については、 SQL 関数の参照トピックをご参照ください。