Amazon S3用Snowpipeの自動化

このトピックでは、S3バケットの Amazon SQS (Simple Queue Service) 通知を使用して、Snowpipeデータのロードを自動的にトリガーする手順を説明します。

注釈

この機能は、Amazon Web ServicesでホストされているSnowflakeアカウントに限定されています。

このトピックの内容:

ネットワークトラフィック

Virtual Private Snowflake(VPS) および AWS PrivateLink のお客様へのノート:

Amazon SQS 通知を使用したSnowpipeの自動化は正常に機能します。ただし、 VPC ( VPS を含む)内の AWS クラウドストレージは独自のメッセージングサービス(Amazon SQS、Amazon Simple Notification Service)と通信できますが、このトラフィックは、 VPC 外のAmazonの安全なネットワーク上のサーバー間を流れます。したがって、このトラフィックは VPC によって保護されません。

クラウドストレージへの安全なアクセスの構成

注釈

データファイルを格納するS3バケットへの安全なアクセスをすでに構成している場合は、このセクションをスキップできます。

このセクションでは、Snowflakeストレージ統合オブジェクトを構成して、クラウドストレージの認証責任をSnowflake IDおよびアクセス管理(IAM)エンティティに委任する方法について説明します。

注釈

このオプションを強くお勧めします。これにより、クラウドストレージにアクセスするときに IAM 認証情報を提供する必要がなくなります。その他のストレージアクセスオプションについては Amazon S3へのセキュアアクセスの構成 をご参照ください。

このセクションでは、ストレージ統合を使用して、Snowflakeが外部(つまり、S3)ステージで参照されるAmazon S3バケットに対してデータを読み書きできるようにする方法について説明します。統合は、名前付きのファーストクラスのSnowflakeオブジェクトであり、秘密キーまたはアクセストークンといった、クラウドプロバイダーの明示的な認証情報を渡す必要がありません。統合オブジェクトには、 AWS IDおよびアクセス管理(IAM)ユーザー IDが保存されます。組織の管理者が、 AWS アカウントの統合 IAM ユーザー権限を付与します。

統合では、統合を使用する外部ステージを作成するときにユーザーが指定できる場所を制限するバケット(およびオプションのパス)もリストできます。

注釈

このセクションの手順を完了するには、 IAM ポリシーおよびロールの作成および管理のための AWS の権限が必要です。 AWS 管理者でない場合は、 AWS 管理者にこれらのタスクを実行するよう依頼します。

次の図は、S3ステージの統合フローを示しています。

Amazon S3 Stage Integration Flow
  1. 外部(つまり、S3)ステージは、その定義でストレージ統合オブジェクトを参照します。

  2. Snowflakeは、ストレージ統合をアカウント用に作成されたS3 IAM ユーザーに自動的に関連付けます。Snowflakeは、SnowflakeアカウントのすべてのS3ストレージ統合によって参照される単一の IAM ユーザーを作成します。

  3. 組織の AWS 管理者は、 IAM ユーザーにステージ定義で参照されているバケットにアクセスする権限を付与します。多数の外部ステージオブジェクトでは、異なるバケットとパスを参照し、認証に同じストレージ統合を使用できます。

ユーザーがステージに対してデータをロードまたはアンロードすると、Snowflakeは、アクセスを許可または拒否する前に、バケット上の IAM ユーザーに付与された権限を確認します。

このセクションの内容:

ステップ1:S3バケットのアクセス許可を構成する

AWS アクセス制御の要件

Snowflakeでは、フォルダー(およびサブフォルダー)内のファイルにアクセスできるようにするために、S3バケットおよびフォルダーに対する次の権限が必要です。

  • s3:GetObject

  • s3:GetObjectVersion

  • s3:ListBucket

ベストプラクティスとして、Snowflakeは、S3バケットへのSnowflakeアクセス用の IAM ポリシーを作成することをお勧めします。その後、ポリシーをロールに添付し、ロールのために AWS によって生成されたセキュリティ認証情報を使用して、バケット内のファイルにアクセスできます。

IAM ポリシーの作成

次の詳細な手順では、S3バケットを使用してデータをロードおよびアンロードできるように、AWS 管理コンソールでSnowflakeのアクセス許可を構成する方法について説明します。

  1. AWS 管理コンソールにログインします。

  2. ホームダッシュボードから、 Identity & Access Management (IAM)を選択します。

    Identity & Access Management in AWS Management Console
  3. 左側のナビゲーションペインから Account settings を選択します。

  4. Security Token Service Regions リストを展開し、アカウントがある 地域 に対応する AWS 地域を見つけ、ステータスが Inactive の場合は Activate を選択します。

  5. 左側のナビゲーションペインから Policies を選択します。

  6. Create Policy をクリックします。

    Create Policy button on Policies page
  7. JSON タブをクリックします。

  8. SnowflakeがS3バケットとフォルダーにアクセスできるようにするポリシードキュメントを追加します。

    次のポリシー( JSON 形式)は、単一のバケットとフォルダーパスを使用してデータをロードまたはアンロードするために必要な権限をSnowflakeに提供します。

    テキストをコピーしてポリシーエディターに貼り付けます。

    注釈

    バケットプレフィックス を実際のバケット名とフォルダーパスプレフィックスに置き換えてください。

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                  "s3:GetObject",
                  "s3:GetObjectVersion",
                ],
                "Resource": "arn:aws:s3:::<bucket>/<prefix>/*"
            },
            {
                "Effect": "Allow",
                "Action": "s3:ListBucket",
                "Resource": "arn:aws:s3:::<bucket>",
                "Condition": {
                    "StringLike": {
                        "s3:prefix": [
                            "<prefix>/*"
                        ]
                    }
                }
            }
        ]
    }
    

    重要

    "s3:prefix": 条件を ["*"] に設定すると、指定されたバケット内のすべてのプレフィックスに対するアクセスが許可されます。バケットに1000個を超えるオブジェクトが存在する場合、次のエラーが発生する可能性があります。 Access Denied (Status Code: 403; Error Code: AccessDenied)

    エラーを回避するには、次のように IAM ポリシーから条件を削除します。例:

    "Condition": {
          "StringLike": {
              "s3:prefix": [
                  "*"
              ]
          }
      }
    

    ポリシーは引き続きバケット内のファイルへのアクセスを許可しますが、バケットに1000個を超えるオブジェクトが存在する場合でも、S3はエラーを返しません。

    AWS ポリシーは、さまざまなセキュリティユースケースをサポートします。

    次のポリシーは、単一の 読み取り専用 バケットとフォルダーパスからデータをロードするために必要な権限をSnowflakeに提供します。ポリシーには、 s3:GetObjects3:GetObjectVersion、および s3:ListBucket の権限が含まれます。

    代替ポリシー:読み取り専用のS3バケットからロードする

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                  "s3:GetObject",
                  "s3:GetObjectVersion"
                ],
                "Resource": "arn:aws:s3:::<bucket>/<prefix>/*"
            },
            {
                "Effect": "Allow",
                "Action": "s3:ListBucket",
                "Resource": "arn:aws:s3:::<bucket>",
                "Condition": {
                    "StringLike": {
                        "s3:prefix": [
                            "<prefix>/*"
                        ]
                    }
                }
            }
        ]
    }
    
  9. Review policy をクリックします。

  10. ポリシー名(例: snowflake_access)とオプションの説明を入力します。Create policy をクリックします。

    Create Policy button in Review Policy page

ステップ2: IAM AWS ロールを作成する

AWS 管理コンソールで、 AWS IAM ロールを作成して、データファイルを含むS3バケットに対する権限を付与します。

  1. AWS 管理コンソールにログインします。

  2. ホームダッシュボードから、 Identity & Access Management (IAM)を選択します。

    Identity & Access Management in AWS Management Console
  3. 左側のナビゲーションペインから Roles を選択します。

  4. Create role ボタンをクリックします。

    Select Trusted Entity Page in AWS Management Console
  5. 信頼できるエンティティタイプとして Another AWS account を選択します。

  6. Account ID フィールドに、自分の AWS アカウントID を一時的に入力します。後で、信頼関係を変更し、Snowflakeへのアクセスを許可します。

  7. Require external ID オプションを選択します。 0000 などのダミーの ID を入力します。後で信頼関係を変更し、Snowflakeステージの外部 ID を指定します。AWS リソース(つまり、S3)からサードパーティ(つまり、Snowflake)へのアクセスを許可するには、外部 ID が必要です。

  8. Next ボタンをクリックします。

  9. ステップ1:S3バケットのアクセス許可を構成する (このトピック内)で作成したポリシーを検索し、このポリシーを選択します。

  10. Next ボタンをクリックします。

    Review Page in AWS Management Console
  11. ロールの名前と説明を入力し、 Create role ボタンをクリックします。

    これで、バケットの IAM ポリシー、および IAM ロールを作成し、ロールにポリシーを添付しました。

  12. 役割の概要ページにある Role ARN 値を記録します。次のステップでは、このロールを参照するSnowflake統合を作成します。

    IAM Role

ステップ3:Snowflakeでクラウドストレージ統合を作成する

CREATE STORAGE INTEGRATION コマンドを使用してストレージ統合を作成します。ストレージ統合は、S3クラウドストレージ用に生成されたIDおよびアクセス管理(IAM)ユーザーと、許可またはブロックされたストレージロケーション(バケットなど)のオプションセットを保存するSnowflakeオブジェクトです。組織のクラウドプロバイダー管理者は、生成されたユーザーにストレージの場所に対するアクセス許可を付与します。このオプションにより、ユーザーはステージの作成時またはデータのロード時に、認証情報の提供を回避できます。

単一のストレージ統合で複数の外部(つまり、S3)ステージをサポートできます。ステージ定義にある URL は、 STORAGE_ALLOWED_LOCATIONS パラメーターのために指定されたS3バケット(およびオプションのパス)に合わせる必要があります。

注釈

この SQL コマンドを実行できるのは、アカウント管理者( ACCOUNTADMIN ロールを持つユーザー)またはグローバル CREATE INTEGRATION 権限を持つロールのみです。

CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = S3
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<iam_role>'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]

条件:

  • 統合名 は、新しい統合の名前です。

  • IAMロール は、 ステップ2: IAM AWS ロールを作成する (このトピック内)で作成したロールのAmazon Resource Name(ARN)です。

  • バケット は、データファイルを保存するS3バケットの名前です(例: mybucket)。必須の STORAGE_ALLOWED_LOCATIONS パラメーターおよびオプションの STORAGE_BLOCKED_LOCATIONS パラメーターは、この統合を参照するステージが作成または変更されたときに、それぞれこれらのバケットへのアクセスを制限またはブロックします。

  • パス は、バケット内のオブジェクトを細かく制御するために使用できるオプションのパスです。

次の例では、2つのバケットとパスのいずれかを参照するために統合を使用する外部ステージを明示的に制限する統合を作成します。後のステップで、これらのバケットとパスのいずれかを参照する外部ステージを作成します。

この統合も使用する追加の外部ステージは、許可されたバケットとパスを参照できます。

CREATE STORAGE INTEGRATION s3_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = S3
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::001234567890:role/myrole'
  STORAGE_ALLOWED_LOCATIONS = ('s3://mybucket1/mypath1/', 's3://mybucket2/mypath2/')
  STORAGE_BLOCKED_LOCATIONS = ('s3://mybucket1/mypath1/sensitivedata/', 's3://mybucket2/mypath2/sensitivedata/');

ステップ4:Snowflakeアカウントの AWS IAM ユーザーを取得する

  1. DESCRIBE INTEGRATION コマンドを実行して、Snowflakeアカウント用に自動的に作成された AWS IAM ユーザーの ARN を取得します。

    DESC INTEGRATION <integration_name>;
    

    条件:

    例:

    DESC INTEGRATION s3_int;
    
    +---------------------------+---------------+--------------------------------------------------------------------------------+------------------+
    | property                  | property_type | property_value                                                                 | property_default |
    +---------------------------+---------------+--------------------------------------------------------------------------------+------------------|
    | ENABLED                   | Boolean       | true                                                                           | false            |
    | STORAGE_ALLOWED_LOCATIONS | List          | s3://mybucket1/mypath1/,s3://mybucket2/mypath2/                                | []               |
    | STORAGE_BLOCKED_LOCATIONS | List          | s3://mybucket1/mypath1/sensitivedata/,s3://mybucket2/mypath2/sensitivedata/    | []               |
    | STORAGE_AWS_IAM_USER_ARN  | String        | arn:aws:iam::123456789001:user/abc1-b-self1234                                 |                  |
    | STORAGE_AWS_ROLE_ARN      | String        | arn:aws:iam::001234567890:role/myrole                                          |                  |
    | STORAGE_AWS_EXTERNAL_ID   | String        | MYACCOUNT_SFCRole=2_a123456/s0aBCDEfGHIJklmNoPq=                               |                  |
    +---------------------------+---------------+--------------------------------------------------------------------------------+------------------+
    
  2. 次の値を記録します。

    説明

    STORAGE_AWS_IAM_USER_ARN

    Snowflakeアカウント用に作成された AWS IAM ユーザー、この例では arn:aws:iam::123456789001:user/abc1-b-self1234 。Snowflakeアカウント全体用に、単一の IAM ユーザーをプロビジョニングします。すべてのS3ストレージ統合は、その IAM ユーザーを使用します。

    STORAGE_AWS_EXTERNAL_ID

    信頼関係を確立するために必要な外部 ID 。

    これらの値は次のセクションで提供します。

ステップ5:バケットオブジェクトにアクセスするために IAM ユーザー権限を付与する

次の手順では、S3バケットを使用してデータをロードおよびアンロードできるように、 AWS 管理コンソールでSnowflakeの IAM アクセス許可を構成する方法を説明します。

  1. AWS 管理コンソールにログインします。

  2. ホームダッシュボードから、 Identity & Access Management (IAM)を選択します。

    Identity & Access Management in AWS Management Console
  3. 左側のナビゲーションペインから Roles を選択します。

  4. ステップ2: IAM AWS ロールを作成する (このトピック内)で作成したロールをクリックします。

  5. Trust relationships タブをクリックします。

  6. Edit trust relationship ボタンをクリックします。

  7. ステップ4:Snowflakeアカウントの AWS IAM ユーザーを取得する (このトピック内)で記録した DESC STORAGE INTEGRATION 出力値でポリシードキュメントを変更します。

    IAM ロールのポリシードキュメント

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "",
          "Effect": "Allow",
          "Principal": {
            "AWS": "<snowflake_user_arn>"
          },
          "Action": "sts:AssumeRole",
          "Condition": {
            "StringEquals": {
              "sts:ExternalId": "<snowflake_external_id>"
            }
          }
        }
      ]
    }
    

    条件:

    • snowflake_external_id は、記録した STORAGE_AWS_EXTERNAL_ID 値です。

    • snowflake_user_arn は、記録した STORAGE_AWS_IAM_USER_ARN 値です。

  8. Update Trust Policy ボタンをクリックします。変更が保存されます。

正しいオプションの決定

先に進む前に、データファイルが置かれているS3バケットのターゲットパス(または AWS 用語では「プレフィックス」)にS3イベント通知が存在するかどうかを確認します。AWS ルールでは、同じパスに対して競合する通知を作成することを禁止しています。

Amazon SQS を使用してSnowpipeを自動化するための次のオプションがサポートされています。

  • オプション1。新しいS3イベント通知: S3バケットのターゲットパスのイベント通知を作成します。イベント通知は、ファイルをロードする準備ができると、 SQS キューを介してSnowpipeに通知します。

    これが最も一般的なオプションです。

    重要

    S3バケットに競合するイベント通知が存在する場合は、代わりにオプション2を使用します。

  • オプション2。既存のイベント通知: Amazon Simple Notification Service(SNS) をブロードキャスターとして構成し、Snowpipe自動化用のSnowflake SQS キューを含む、複数のエンドポイント(または「サブスクライバー」。例: SQS キューまたは AWS Lambdaワークロード)に通知を共有するようにします。 SNS によってパブリッシュされたS3イベント通知は、ファイルのロードする準備が整ったときに SQS キューを介してSnowpipeに通知します。

オプション1:Snowpipeを自動化するために、新しいS3イベント通知を作成する

このセクションでは、S3バケットの Amazon SQS (Simple Queue Service) 通知を使用して、Snowpipeデータのロードを自動的にトリガーするための最も一般的なオプションについて説明します。手順では、データファイルが保存されているS3バケットのターゲットパス(または AWS の用語では「プレフィックス」)のイベント通知を作成する方法を説明します。

重要

S3バケットに対して競合するイベント通知が存在する場合は、代わりに オプション2: SQS 通知を使用してSnowpipeを自動化するために、Amazon SNS を構成する (このトピック内)を使用します。AWS ルールでは、同じターゲットパスに対して競合する通知の作成を禁じています。

次の図は、Snowpipeの自動インジェストプロセスフローを示しています。

Snowpipe Auto-ingest Process Flow
  1. データファイルはステージにロードされます。

  2. S3イベント通知は、SQS キューを介してSnowpipeにファイルをロードする準備ができたことを通知します。Snowpipeはファイルをキューにコピーします。

  3. Snowflakeが提供する仮想ウェアハウスは、指定されたパイプで定義されたパラメーターに基づいて、キューに入れられたファイルからターゲットテーブルにデータをロードします。

注釈

このトピックの手順では、データがロードされるSnowflakeデータベースにターゲットテーブルが既に存在することを前提としています。

ステップ1:ステージを作成する(必要な場合)

CREATE STAGE コマンドを使用して、S3バケットを参照する外部ステージを作成します。Snowpipeはステージからデータファイルをフェッチし、ターゲットテーブルにロードする前に一時的にキューに入れます。または、既存の外部ステージを使用できます。

注釈

クラウドストレージの場所への安全なアクセスを構成するには、 クラウドストレージへの安全なアクセスの構成 (このトピック内)を参照してください。

次の例では、ユーザーセッションのアクティブスキーマに mystage という名前のステージを作成します。クラウドストレージ URL にはパス files が含まれています。ステージは myint という名前のストレージ統合を参照します。

USE SCHEMA snowpipe_db.public;

CREATE STAGE mystage
  URL = 's3://mybucket/load/files'
  STORAGE_INTEGRATION = myint;

ステップ2:自動インジェストを有効にしたパイプを作成する

CREATE PIPE コマンドを使用してパイプを作成します。パイプは、Snowpipeがインジェスションキューからターゲットテーブルにデータをロードするために使用する COPY INTO <テーブル> ステートメントを定義します。

次の例では、ユーザーセッションのアクティブスキーマに mypipe という名前のパイプを作成します。パイプは、 mystage ステージでステージングされたファイルから mytable テーブルにデータをロードします。

create pipe snowpipe_db.public.mypipe auto_ingest=true as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

AUTO_INGEST=true パラメーターは、新しいデータをロードする準備ができたときに、S3バケットから SQS キューに送信されたイベント通知を読み取ることを指定します。

重要

パイプ定義のステージ参照を既存のパイプと比較します。同じS3バケットのディレクトリパスが重複していないことを確認します。そうしないと、複数のパイプが同じデータファイルのセットを複数回、1つ以上のターゲットテーブルにロードする可能性があります。これは、たとえば、複数のステージが s3://mybucket/path1s3://mybucket/path1/path2 などの異なるレベルの細分性で同じS3バケットを参照する場合に発生する可能性があります。このユースケースでは、ファイルが s3://mybucket/path1/path2 にステージングされている場合、両方のステージのパイプがファイルのコピーをロードします。

これは、Snowpipeの手動設定(自動インジェスト 無効)とは異なり、ユーザーがファイルの名前付きセットを REST API に送信して、ロードするファイルをキューに入れる必要があります。自動インジェストを有効にすると、各パイプはS3イベント通知から生成されたファイルリストを受け取ります。データの重複を避けるために、追加の注意が必要です。

注釈

データベースまたはスキーマのクローンを作成すると、ソースデータベースまたはスキーマ内の外部ステージを参照するパイプを含む、 すべて のオブジェクトがクローンされます。データファイルがステージの場所(例:S3バケット)に作成されるとき、通知のコピーがステージの場所に一致するすべてのパイプに送信されます。これにより、次の動作が発生します。

  • パイプ定義の COPY ステートメントでテーブルが完全修飾されている場合( データベース名.スキーマ名.テーブル名 または スキーマ名.テーブル名 の形式)、次にSnowpipeは各パイプのソーステーブル(つまり、 COPY ステートメントの データベース.スキーマ.テーブル )に重複データをロードします。

  • テーブルがパイプ定義で完全修飾されて いない 場合、Snowpipeはデータをソースおよびクローン化されたデータベース/スキーマの同じテーブル(例: mytable)にロードします。

ステップ3:セキュリティを構成する

Snowpipeを使用して連続的なデータロードを実行するユーザーごとに、データロードのオブジェクト(ターゲットデータベース、スキーマ、テーブル、ステージオブジェクト、パイプ)に十分なアクセス制御権限を付与します。

注釈

「最小権限」の一般原則に従うために、パイプを使用してファイルを取り込むために使用する別個のユーザーとロールを作成することをお勧めします。ユーザーは、このロールを既定のロールとして作成する必要があります。

Snowpipeを使用するには、次の権限を持つロールが必要です。

オブジェクト

権限

注意

名前付きパイプ

OWNERSHIP

名前付きステージ

USAGE , READ

名前付きファイル形式

USAGE

オプション: ステップ1:ステージを作成する(必要な場合) で作成したステージが名前付きファイル形式を参照する場合にのみ必要です。

ターゲットデータベース

USAGE

ターゲットスキーマ

USAGE

ターゲットテーブル

INSERT , SELECT

GRANT <権限> ... TO ROLE コマンドを使用して、ロールに権限を付与します。

注釈

ロールを作成して権限を付与できるのは、セキュリティ管理者(つまり、 SECURITYADMIN ロールを持つユーザー)以上、またはアカウントの CREATE ROLE 権限とグローバル MANAGE GRANTS 権限の両方を持つ別のロールのみです。

たとえば、 snowpipe_db.public データベースオブジェクトのセットと mypipe という名前のパイプにアクセスできる snowpipe1 という名前のロールを作成します。次に、ロールをユーザーに付与します。

-- Create a role to contain the Snowpipe privileges
use role securityadmin;

create or replace role snowpipe1;

-- Grant the required privileges on the database objects
grant usage on database snowpipe_db to role snowpipe1;

grant usage on schema snowpipe_db.public to role snowpipe1;

grant insert, select on snowpipe_db.public.mytable to role snowpipe1;

grant usage on stage snowpipe_db.public.mystage to role snowpipe1;

-- Grant the OWNERSHIP privilege on the pipe object
grant ownership on pipe snowpipe_db.public.mypipe to role snowpipe1;

-- Grant the role to a user
grant role snowpipe1 to user jsmith;

-- Set the role as the default role for the user
alter user jsmith set default_role = snowpipe1;

ステップ4:イベント通知を構成する

S3バケットのイベント通知を構成して、新しいデータをロードできるようになったときにSnowpipeに通知します。自動インジェスト機能は、S3からSnowpipeにイベント通知を配信するために SQS キューに依存しています。

使いやすくするために、Snowpipe SQS キューはSnowflakeによって作成および管理されます。SHOW PIPES コマンド出力には、 SQS キューのAmazonリソースネーム(ARN)が表示されます。

  1. SHOW PIPES コマンドを実行します。

    SHOW PIPES;
    

    notification_channel 列のステージには、 SQS キューの ARN があります。 ARN を便利な場所にコピーします。

    注釈

    AWS ガイドラインに従って、SnowflakeはS3バケットごとに1つ以下の SQS キューを指定します。この SQS キューは、同じ AWS アカウントの複数のバケットで共有できます。 SQS キューは、S3バケットの外部ステージをターゲットテーブルに接続するすべてのパイプの通知を調整します。データファイルがバケットにアップロードされると、ステージディレクトリパスに一致するすべてのパイプが、対応するターゲットテーブルへのファイルの1回限りのロードを実行します。

  2. Amazon S3コンソールにログインします。

  3. Amazon S3ドキュメント に記載されている手順を使用して、S3バケットのイベント通知を構成します。次のようにフィールドに入力します。

    • Name :イベント通知の名前(例: Auto-ingest Snowflake)。

    • EventsObjectCreate (All) オプションを選択します。

    • Send to :ドロップダウンリストから SQS Queue を選択します。

    • SQS :ドロップダウンリストから Add SQS queue ARN を選択します。

    • SQS queue ARN : SHOW PIPES 出力から SQS キュー名を貼り付けます。

注釈

これらの手順は、S3バケット全体のアクティビティをモニターする単一のイベント通知を作成します。これが最も簡単なアプローチです。この通知は、S3バケットディレクトリでより詳細なレベルで構成されたすべてのパイプを処理します。Snowpipeは、パイプ定義で指定されたデータファイルのみを読み込みます。ただし、パイプ定義外のアクティビティに関する大量の通知は、Snowpipeが通知をフィルタリングしてアクションを実行するレートに悪影響を与える可能性があります。

または、上記の手順で、1つ以上のパスおよび/またはファイル拡張子(または AWS の用語では プレフィックス および サフィックス)を構成して、イベントアクティビティをフィルタリングします。手順については、関連する AWS ドキュメントトピック のオブジェクトキー名フィルタリング情報をご参照ください。通知で監視する追加のパスまたはファイル拡張子ごとにこれらの手順を繰り返します。

AWS では、これらの通知 キュー構成 の数がS3バケットごとに最大100に制限されます。

また、 AWS では、同じS3バケットのキュー構成の重複(イベント通知全体)が許可されません。たとえば、既存の通知が s3://mybucket/load/path1 に構成されている場合、 s3://mybucket/load などの上位レベルで別の通知を作成することはできません。その逆も同様です。

自動インジェストを使用したSnowpipeが構成されました。

新しいデータファイルがS3バケットに追加されると、イベント通知はSnowpipeに通知して、それらをパイプで定義されたターゲットテーブルにロードします。

ステップ5:履歴ファイルをロードする

SQS 通知が構成される 前に 外部ステージに存在したデータファイルのバックログをロードするには、 履歴データのロード をご参照ください。

オプション2: SQS 通知を使用してSnowpipeを自動化するために、Amazon SNS を構成する

このセクションでは、S3バケットの Amazon SQS (Simple Queue Service) 通知を使用して、Snowpipeデータのロードを自動的にトリガーする方法について説明します。この手順では、 Amazon Simple Notification Service(SNS) をブロードキャスターとして構成して、S3バケットのイベント通知を複数のサブスクライバー(例: SQS キューまたは AWS Lambdaワークロード)にパブリッシュする方法を説明します。サブスクライバーには、Snowpipe自動化用のSnowflake SQS キューを含みます。

注釈

これらの手順は、データファイルが置かれているS3バケットのターゲットパスにイベント通知が存在することを前提としています。イベント通知が存在しない場合は、次のいずれかを実行します。

次の図は、Amazon SNSを使用したSnowpipeの自動インジェストのプロセスフローを示しています。

Snowpipe Auto-ingest Process Flow with Amazon SNS
  1. データファイルはステージにロードされます。

  2. SNS によってパブリッシュされたS3イベント通知は、ファイルがロード可能な状態であることを SQS キュー経由でSnowpipeに通知します。Snowpipeはファイルをキューにコピーします。

  3. Snowflakeが提供する仮想ウェアハウスは、指定されたパイプで定義されたパラメーターに基づいて、キューに入れられたファイルからターゲットテーブルにデータをロードします。

注釈

この手順では、データがロードされるSnowflakeデータベースにターゲットテーブルが既に存在することを前提としています。

前提条件:Amazon SNS トピックとサブスクリプションを作成する

  1. AWS アカウントで SNS トピックを作成して、S3バケットのSnowflakeステージロケーションのすべてのメッセージを処理します。

  2. S3イベント通知(例:他の SQS キューまたは AWS Lambdaワークロード)のターゲットの宛先をこのトピックにサブスクライブします。SNS バケットのイベント通知をトピックのすべてのサブスクライバーにパブリッシュします。

手順については、 SNS ドキュメント をご参照ください。

ステップ1:Snowflake SQS キューを SNS トピックに登録する

  1. AWS 管理コンソールにログインします。

  2. ホームダッシュボードから、 Simple Notification Service (SNS)を選択します。

  3. 左側のナビゲーションペインから Topics を選択します。

  4. S3バケットのトピックを見つけます。トピック ARN に注意します。

  5. Snowflakeクライアントを使用して、 SNS トピック ARNで SYSTEM$GET_AWS_SNS_IAM_POLICY システム関数をクエリします。

    select system$get_aws_sns_iam_policy('<sns_topic_arn>');
    

    この関数は、 SNS トピックをサブスクライブするSnowflake SQS キュー許可を付与する IAM ポリシーを返します。

    例:

    select system$get_aws_sns_iam_policy('arn:aws:sns:us-west-2:001234567890:s3_mybucket');
    
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | SYSTEM$GET_AWS_SNS_IAM_POLICY('ARN:AWS:SNS:US-WEST-2:001234567890:S3_MYBUCKET')                                                                                                                                                                   |
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | {"Version":"2012-10-17","Statement":[{"Sid":"1","Effect":"Allow","Principal":{"AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"},"Action":["sns:Subscribe"],"Resource":["arn:aws:sns:us-west-2:001234567890:s3_mybucket"]}]}                 |
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    
  6. AWS 管理コンソールに戻ります。左側のナビゲーションペインから Topics を選択します。

  7. S3バケットのトピックの横にあるチェックボックスを選択し、 Actions メニューから Edit topic policy をクリックします。 Advanced view タブをクリックして、ポリシーの JSON 形式を編集します。

  8. SYSTEM$GET_AWS_SNS_IAM_POLICY 関数の結果から追加された IAM ポリシーを JSON ドキュメントにマージします。

    例:

    元の IAM ポリシー(略称):

    {
      "Version":"2008-10-17",
      "Id":"__default_policy_ID",
      "Statement":[
         {
            "Sid":"__default_statement_ID",
            "Effect":"Allow",
            "Principal":{
               "AWS":"*"
            }
            ..
         }
       ]
     }
    

    統合された IAM ポリシー:

    {
      "Version":"2008-10-17",
      "Id":"__default_policy_ID",
      "Statement":[
         {
            "Sid":"__default_statement_ID",
            "Effect":"Allow",
            "Principal":{
               "AWS":"*"
            }
            ..
         },
         {
            "Sid":"1",
            "Effect":"Allow",
            "Principal":{
              "AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"
             },
             "Action":[
               "sns:Subscribe"
             ],
             "Resource":[
               "arn:aws:sns:us-west-2:001234567890:s3_mybucket"
             ]
         }
       ]
     }
    
  9. 追加のポリシー許可を追加して、S3がバケットのイベント通知を SNS トピックに公開できるようにします。

    例(これらの手順全体で使用される SNS トピック ARN およびS3バケットを使用):

    {
        "Sid":"s3-event-notifier",
        "Effect":"Allow",
        "Principal":{
           "Service":"s3.amazonaws.com"
        },
        "Action":"SNS:Publish",
        "Resource":"arn:aws:sns:us-west-2:001234567890:s3_mybucket",
        "Condition":{
           "ArnLike":{
              "aws:SourceArn":"arn:aws:s3:*:*:s3_mybucket"
           }
        }
     }
    

    統合された IAM ポリシー:

    {
      "Version":"2008-10-17",
      "Id":"__default_policy_ID",
      "Statement":[
         {
            "Sid":"__default_statement_ID",
            "Effect":"Allow",
            "Principal":{
               "AWS":"*"
            }
            ..
         },
         {
            "Sid":"1",
            "Effect":"Allow",
            "Principal":{
              "AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"
             },
             "Action":[
               "sns:Subscribe"
             ],
             "Resource":[
               "arn:aws:sns:us-west-2:001234567890:s3_mybucket"
             ]
         },
         {
            "Sid":"s3-event-notifier",
            "Effect":"Allow",
            "Principal":{
               "Service":"s3.amazonaws.com"
            },
            "Action":"SNS:Publish",
            "Resource":"arn:aws:sns:us-west-2:001234567890:s3_mybucket",
            "Condition":{
               "ArnLike":{
                  "aws:SourceArn":"arn:aws:s3:*:*:s3_mybucket"
               }
            }
          }
       ]
     }
    
  10. Update policy ボタンをクリックします。

ステップ2:ステージを作成する(必要な場合)

CREATE STAGE コマンドを使用して、S3バケットを参照する外部ステージを作成します。Snowpipeはステージからデータファイルをフェッチし、ターゲットテーブルにロードする前に一時的にキューに入れます。

または、既存の外部ステージを使用できます。

注釈

クラウドストレージの場所への安全なアクセスを構成するには、 クラウドストレージへの安全なアクセスの構成 (このトピック内)を参照してください。

次の例では、ユーザーセッションのアクティブスキーマに mystage という名前のステージを作成します。クラウドストレージ URL にはパス files が含まれています。ステージは myint という名前のストレージ統合を参照します。

CREATE STAGE mystage
  URL = 's3://mybucket/load/files'
  STORAGE_INTEGRATION = myint;

ステップ3:自動インジェストを有効にしたパイプを作成する

CREATE PIPE コマンドを使用してパイプを作成します。パイプは、Snowpipeがインジェスションキューからターゲットテーブルにデータをロードするために使用する COPY INTO <テーブル> ステートメントを定義します。COPY ステートメントで、 前提条件:Amazon SNS とサブスクリプションを作成する から SNS トピック ARN を識別します。

次の例では、ユーザーセッションのアクティブスキーマに mypipe という名前のパイプを作成します。パイプは、 mystage ステージでステージングされたファイルから mytable テーブルにデータをロードします。

create pipe snowpipe_db.public.mypipe
  auto_ingest=true
  aws_sns_topic='<sns_topic_arn>'
  as
    copy into snowpipe_db.public.mytable
    from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

条件:

AUTO_INGEST = true

新しいデータをロードする準備ができたときに、S3バケットから SQS キューに送信されたイベント通知を読み取ることを指定します。

AWS_SNS_TOPIC = '<SNSトピックARN>'

現在の例 arn:aws:sns:us-west-2:001234567890:s3_mybucket のように、S3バケットの SNS トピックの ARN を指定します。CREATE PIPE ステートメントは、指定された SNS トピックにSnowflake SQS キューをサブスクライブします。パイプは、 SNS トピックを介したイベント通知によってトリガーされたインジェストキューにのみファイルをコピーします。

パイプからいずれかのパラメーターを削除するには、現在、 CREATE OR REPLACE PIPE 構文を使用してパイプを再作成する必要があります。

重要

パイプ定義のステージ参照を既存のパイプと比較します。同じS3バケットのディレクトリパスが重複していないことを確認します。そうしないと、複数のパイプが同じデータファイルのセットを複数回、1つ以上のターゲットテーブルにロードする可能性があります。これは、たとえば、複数のステージが s3://mybucket/path1s3://mybucket/path1/path2 などの異なるレベルの細分性で同じS3バケットを参照する場合に発生する可能性があります。このユースケースでは、ファイルが s3://mybucket/path1/path2 にステージングされている場合、両方のステージのパイプがファイルのコピーをロードします。

これは、Snowpipeの手動設定(自動インジェスト 無効)とは異なり、ユーザーがファイルの名前付きセットを REST API に送信して、ロードするファイルをキューに入れる必要があります。自動インジェストを有効にすると、各パイプはS3イベント通知から生成されたファイルリストを受け取ります。データの重複を避けるために、追加の注意が必要です。

ステップ4:セキュリティを構成する

Snowpipeを使用して連続的なデータロードを実行するユーザーごとに、データロードのオブジェクト(ターゲットデータベース、スキーマ、テーブル、ステージオブジェクト、パイプ)に十分なアクセス制御権限を付与します。

注釈

「最小権限」の一般原則に従うために、パイプを使用してファイルを取り込むために使用する別個のユーザーとロールを作成することをお勧めします。ユーザーは、このロールを既定のロールとして作成する必要があります。

Snowpipeを使用するには、次の権限を持つロールが必要です。

オブジェクト

権限

注意

名前付きパイプ

OWNERSHIP

名前付きステージ

USAGE , READ

名前付きファイル形式

USAGE

オプション: ステップ2:ステージを作成する(必要な場合) で作成したステージが名前付きファイル形式を参照する場合にのみ必要です。

ターゲットデータベース

USAGE

ターゲットスキーマ

USAGE

ターゲットテーブル

INSERT , SELECT

GRANT <権限> ... TO ROLE コマンドを使用して、ロールに権限を付与します。

注釈

セキュリティ管理者(つまり、 SECURITYADMIN ロールを持つユーザー)以上のみがロールを作成できます。

たとえば、 snowpipe_db.public データベースオブジェクトのセットと mypipe という名前のパイプにアクセスできる snowpipe1 という名前のロールを作成します。次に、ロールをユーザーに付与します。

-- Create a role to contain the Snowpipe privileges
use role securityadmin;

create or replace role snowpipe1;

-- Grant the required privileges on the database objects
grant usage on database snowpipe_db to role snowpipe1;

grant usage on schema snowpipe_db.public to role snowpipe1;

grant insert, select on snowpipe_db.public.mytable to role snowpipe1;

grant usage, read on stage snowpipe_db.public.mystage to role snowpipe1;

-- Grant the OWNERSHIP privilege on the pipe object
grant ownership on pipe snowpipe_db.public.mypipe to role snowpipe1;

-- Grant the role to a user
grant role snowpipe1 to user jsmith;

-- Set the role as the default role for the user
alter user jsmith set default_role = snowpipe1;

自動インジェストを使用したSnowpipeが構成されました。

新しいデータファイルがS3バケットに追加されると、イベント通知はSnowpipeに通知して、それらをパイプで定義されたターゲットテーブルにロードします。

ステップ5:履歴ファイルをロードする

SQS 通知が構成される 前に 外部ステージに存在したデータファイルのバックログをロードするには、 履歴データのロード をご参照ください。

SYSTEM$PIPE_STATUS 出力

SYSTEM$PIPE_STATUS 関数は、パイプの現在のステータスの JSON 表現を取得します。

TRUEに設定された AUTO_INGEST のパイプの場合、関数は次の名前/値のペアを含む JSON オブジェクトを返します(現在のパイプステータスに該当する場合)。

{"executionState":"<値>","oldestFileTimestamp":<値>,"pendingFileCount":<値>,"notificationChannelName":"<値>","numOutstandingMessagesOnChannel":<値値>,"lastReceivedMessageTimestamp":"<値>","lastForwardedMessageTimestamp":"<値>","error":<値>,"fault":<値>}

条件:

executionState

パイプの現在の実行状態は、次のいずれかです。

  • RUNNING (つまり、すべてが正常です。Snowflakeはこのパイプのファイルをアクティブに処理する場合としない場合があります)

  • STOPPED_FEATURE_DISABLED

  • STOPPED_STAGE_DROPPED

  • STOPPED_FILE_FORMAT_DROPPED

  • STOPPED_MISSING_PIPE

  • STOPPED_MISSING_TABLE

  • STALLED_COMPILATION_ERROR

  • STALLED_INITIALIZATION_ERROR

  • STALLED_EXECUTION_ERROR

  • STALLED_INTERNAL_ERROR

  • PAUSED

  • PAUSED_BY_SNOWFLAKE_ADMIN

  • PAUSED_BY_ACCOUNT_ADMIN

oldestFileTimestamp

現在キューに入れられているデータファイルの中で最も早いタイムスタンプ(該当する場合)。ファイルがキューに追加されるときにタイムスタンプが設定されます。

pendingFileCount

現在パイプによって処理されているファイルの数。パイプが一時停止されている場合、パイプが一時停止される 前に キューに入れられたファイルが処理されるため、この値は減少します。この値が 0 の場合、このパイプのキューに入れられているファイルがないか、パイプが事実上一時停止しています。

notificationChannelName

パイプに関連付けられたAmazon SQS キュー。

numOutstandingMessagesOnChannel

キューに入れられたがまだ受信されていない SQS キュー内のメッセージの数。

lastReceivedMessageTimestamp

SQS キューから受信した最後のメッセージのタイムスタンプ。たとえば、メッセージに関連付けられたパス/プレフィックスがパイプ定義のパス/プレフィックスと一致しない場合など、このメッセージが特定のパイプに適用されない場合があります。また、作成されたデータオブジェクトによってトリガーされたメッセージのみが自動インジェストパイプによって消費されます。

lastForwardedMessageTimestamp

パイプに転送された一致するパス/プレフィックスを持つ最後の「オブジェクトの作成」イベントメッセージのタイムスタンプ。

error

パイプが実行のために最後にコンパイルされたときに生成されたエラーメッセージ(該当する場合)。多くの場合、権限の問題またはオブジェクトの削除により、必要なオブジェクト(テーブル、ステージ、ファイル形式)へのアクセスに問題が生じたことが原因です。

fault

最新の内部Snowflakeプロセスエラー(該当する場合)。主にSnowflakeがデバッグ目的で使用します。