Amazon S3用Snowpipeの自動化¶
このトピックでは、S3バケットの Amazon SQS (Simple Queue Service) 通知を使用して、Snowpipeデータのロードを自動的にトリガーする手順を説明します。
Snowflakeは、コスト、イベントノイズ、遅延を低減するために、Snowpipeでサポートされているイベントのみを送信することをお勧めします。
このトピックの内容:
クラウドプラットフォームのサポート¶
S3イベントメッセージを使用した自動Snowpipeデータロードのトリガーは、 サポートされているすべてのクラウドプラットフォーム でホストされているSnowflakeアカウントでサポートされています。
ネットワークトラフィック¶
Virtual Private Snowflake(VPS) および AWS PrivateLink のお客様へのノート:
Amazon SQS 通知を使用したSnowpipeの自動化は正常に機能します。ただし、 VPC ( VPS を含む)内の AWS クラウドストレージは独自のメッセージングサービス(Amazon SQS、Amazon Simple Notification Service)と通信できますが、このトラフィックは、 VPC 外のAmazonの安全なネットワーク上のサーバー間を流れます。したがって、このトラフィックは VPC によって保護されません。
クラウドストレージへの安全なアクセスの構成¶
注釈
データファイルを格納するS3バケットへの安全なアクセスをすでに構成している場合は、このセクションをスキップできます。
このセクションでは、ストレージ統合を使用して、Snowflakeが外部(つまり、S3)ステージで参照されるAmazon S3バケットに対してデータを読み書きできるようにする方法について説明します。統合は、名前付きのファーストクラスのSnowflakeオブジェクトであり、秘密キーまたはアクセストークンといった、クラウドプロバイダーの明示的な認証情報を渡す必要がありません。統合オブジェクトには、 AWS IDおよびアクセス管理(IAM)ユーザー IDが保存されます。組織の管理者が、 AWS アカウントの統合 IAM ユーザー権限を付与します。
統合では、統合を使用する外部ステージを作成するときにユーザーが指定できる場所を制限するバケット(およびオプションのパス)もリストできます。
注釈
このセクションの手順を完了するには、 IAM ポリシーおよびロールの作成および管理のための AWS の権限が必要です。AWS 管理者でない場合は、 AWS 管理者にこれらのタスクを実行するように依頼します。
現在、ストレージ統合を使用した 政府リージョン のS3ストレージへのアクセスは、同じ政府リージョンの AWS でホストされているSnowflakeアカウントに限定されていることに注意してください。直接認証情報を使用した、政府リージョン外でホストされているアカウントからS3ストレージへのアクセスがサポートされています。
次の図は、S3ステージの統合フローを示しています。
外部(つまり、S3)ステージは、その定義でストレージ統合オブジェクトを参照します。
Snowflakeは、ストレージ統合をアカウント用に作成されたS3 IAM ユーザーに自動的に関連付けます。Snowflakeは、SnowflakeアカウントのすべてのS3ストレージ統合によって参照される単一の IAM ユーザーを作成します。
組織の AWS 管理者は、 IAM ユーザーにステージ定義で参照されているバケットにアクセスする権限を付与します。多数の外部ステージオブジェクトでは、異なるバケットとパスを参照し、認証に同じストレージ統合を使用できます。
ユーザーがステージに対してデータをロードまたはアンロードすると、Snowflakeは、アクセスを許可または拒否する前に、バケット上の IAM ユーザーに付与された権限を確認します。
注釈
このオプションを強くお勧めします。これにより、クラウドストレージにアクセスするときに IAM 認証情報を提供する必要がなくなります。その他のストレージアクセスオプションについては Amazon S3へのセキュアアクセスの構成 をご参照ください。
このセクションの内容:
ステップ1: S3バケットのアクセス許可を構成する¶
AWS アクセス制御の要件¶
Snowflakeでは、フォルダー(およびサブフォルダー)内のファイルにアクセスできるようにするために、S3バケットおよびフォルダーに対する次の権限が必要です。
s3:GetBucketLocation
s3:GetObject
s3:GetObjectVersion
s3:ListBucket
ベストプラクティスとして、Snowflakeは、S3バケットへのSnowflakeアクセス用の IAM ポリシーを作成することをお勧めします。その後、ポリシーをロールに添付し、ロールのために AWS によって生成されたセキュリティ認証情報を使用して、バケット内のファイルにアクセスできます。
IAM ポリシーの作成¶
次の詳細な手順では、S3バケットにアクセスするために AWS 管理コンソールでSnowflakeのアクセス許可を構成する方法について説明します。
AWS 管理コンソールにログインします。
ホームダッシュボードから、 Identity & Access Management (IAM)を選択します。
左側のナビゲーションペインから Account settings を選択します。
Security Token Service Regions リストを展開し、アカウントがある 地域 に対応する AWS 地域を見つけ、ステータスが Inactive の場合は Activate を選択します。
左側のナビゲーションペインから Policies を選択します。
Create Policy をクリックします。
JSON タブをクリックします。
SnowflakeがS3バケットとフォルダーにアクセスできるようにするポリシードキュメントを追加します。
次のポリシー( JSON 形式)は、単一のバケットとフォルダーパスを使用してデータをロードまたはアンロードするために必要な権限をSnowflakeに提供します。
テキストをコピーしてポリシーエディターに貼り付けます。
注釈
bucket
とprefix
を実際のバケット名とフォルダーパスプレフィックスに確実に置き換えます。政府リージョン のバケットのAmazonリソースネーム(ARN)には、
arn:aws-us-gov:s3:::
プレフィックスがあります。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": "arn:aws:s3:::<bucket>/<prefix>/*" }, { "Effect": "Allow", "Action": [ "s3:ListBucket", "s3:GetBucketLocation" ], "Resource": "arn:aws:s3:::<bucket>", "Condition": { "StringLike": { "s3:prefix": [ "<prefix>/*" ] } } } ] }
注釈
"s3:prefix":
条件を["*"]
または["< パス>/*"]
に設定すると、指定されたバケット内のすべてのプレフィックスまたはバケット内のパスへのアクセスがそれぞれ許可されます。AWS ポリシーは、さまざまなセキュリティユースケースをサポートします。
Review policy をクリックします。
ポリシー名(例:
snowflake_access
)とオプションの説明を入力します。 Create policy をクリックします。
ステップ2: IAM AWS ロールを作成する¶
AWS 管理コンソールでSnowflakeのアクセス許可を設定するには、次の手順を実行します。
AWS 管理コンソールにログインします。
ホームダッシュボードから、 Identity & Access Management (IAM)を選択します。
左側のナビゲーションペインから Roles を選択します。
Create role を選択します。
信頼できるエンティティタイプとして Another AWS account を選択します。
Account ID フィールドに、自分の AWS アカウント ID を一時的に入力します。後で、信頼関係を変更し、Snowflakeへのアクセスを許可します。
Require external ID オプションを選択します。外部 ID は、Snowflakeなどのサードパーティに AWS リソース(S3バケットなど)へのアクセスを許可するために使用されます。
0000
などのプレースホルダー ID を入力します。後のステップで、 IAM ロールの信頼関係を変更し、ストレージ統合の外部 ID を指定します。Next を選択します。
ステップ1: S3バケットのアクセス許可を構成する (このトピック内)で作成したポリシーを選択します。
Next を選択します。
ロールの名前と説明を入力し、 Create role を選択します。
これで、バケットの IAM ポリシー、および IAM ロールを作成し、ロールにポリシーを添付しました。
ロールの概要ページにある Role ARN 値を見つけて記録します。次のステップでは、このロールを参照するSnowflake統合を作成します。
注釈
Snowflakeは、一時的な認証情報を一定期間キャッシュしますが、60分の有効期限を越えることができません。Snowflakeからのアクセスを取り消すと、ユーザーはキャッシュの有効期限が切れるまでファイルをリストし、クラウドストレージの場所からデータにアクセスできる場合があります。
ステップ3:Snowflakeでクラウドストレージ統合を作成する¶
CREATE STORAGE INTEGRATION コマンドを使用してストレージ統合を作成します。ストレージ統合は、S3クラウドストレージ用に生成されたIDおよびアクセス管理(IAM)ユーザーと、許可またはブロックされたストレージロケーション(バケットなど)のオプションセットを保存するSnowflakeオブジェクトです。組織のクラウドプロバイダー管理者は、生成されたユーザーにストレージの場所に対するアクセス許可を付与します。このオプションにより、ユーザーはステージの作成時またはデータのロード時に、認証情報の提供を回避できます。
単一のストレージ統合で複数の外部(つまり、S3)ステージをサポートできます。ステージ定義にある URL は、 STORAGE_ALLOWED_LOCATIONS パラメーターのために指定されたS3バケット(およびオプションのパス)に合わせる必要があります。
注釈
この SQL コマンドを実行できるのは、アカウント管理者( ACCOUNTADMIN ロールを持つユーザー)またはグローバル CREATE INTEGRATION 権限を持つロールのみです。
CREATE STORAGE INTEGRATION <integration_name>
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = '<iam_role>'
STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/')
[ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]
条件:
integration_name
は、新しい統合の名前です。iam_role
は、 ステップ2: AWS で IAM ロールを作成する (このトピック内)で作成したロールのAmazonリソースネーム(ARN)です。bucket
は、データファイルを保存するS3バケットの名前です(例:mybucket
)。必須の STORAGE_ALLOWED_LOCATIONS パラメーターおよびオプションの STORAGE_BLOCKED_LOCATIONS パラメーターは、この統合を参照するステージが作成または変更されたときに、それぞれこれらのバケットへのアクセスを制限またはブロックします。path
は、バケット内のオブジェクトを細かく制御するために使用できるオプションのパスです。
次の例では、アカウント内のすべてのバケットへのアクセスを許可し、定義された sensitivedata
フォルダーへのアクセスをブロックする統合を作成します。
この統合も使用する追加の外部ステージは、許可されたバケットとパスを参照できます。
CREATE STORAGE INTEGRATION s3_int TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = 'S3' ENABLED = TRUE STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::001234567890:role/myrole' STORAGE_ALLOWED_LOCATIONS = ('*') STORAGE_BLOCKED_LOCATIONS = ('s3://mybucket1/mypath1/sensitivedata/', 's3://mybucket2/mypath2/sensitivedata/');
注釈
オプションで、 STORAGE_AWS_EXTERNAL_ID パラメーターを使用して、独自の外部 ID を指定します。複数の外部ボリュームやストレージ統合で同じ外部 ID を使用する場合は、このオプションを選択します。
ステップ4:Snowflakeアカウントの AWS IAM ユーザーを取得する¶
Snowflakeアカウント用に自動的に作成された IAM ユーザーの ARN を取得するには、 DESCRIBE INTEGRATION を使用します。
DESC INTEGRATION <integration_name>;
条件:
integration_name
は、 ステップ3: Snowflakeでクラウドストレージ統合を作成する (このトピック内)で作成した統合の名前です。
例:
DESC INTEGRATION s3_int; +---------------------------+---------------+--------------------------------------------------------------------------------+------------------+ | property | property_type | property_value | property_default | +---------------------------+---------------+--------------------------------------------------------------------------------+------------------| | ENABLED | Boolean | true | false | | STORAGE_ALLOWED_LOCATIONS | List | s3://mybucket1/mypath1/,s3://mybucket2/mypath2/ | [] | | STORAGE_BLOCKED_LOCATIONS | List | s3://mybucket1/mypath1/sensitivedata/,s3://mybucket2/mypath2/sensitivedata/ | [] | | STORAGE_AWS_IAM_USER_ARN | String | arn:aws:iam::123456789001:user/abc1-b-self1234 | | | STORAGE_AWS_ROLE_ARN | String | arn:aws:iam::001234567890:role/myrole | | | STORAGE_AWS_EXTERNAL_ID | String | MYACCOUNT_SFCRole=2_a123456/s0aBCDEfGHIJklmNoPq= | | +---------------------------+---------------+--------------------------------------------------------------------------------+------------------+
次のプロパティの値を記録します。
プロパティ
説明
STORAGE_AWS_IAM_USER_ARN
Snowflakeアカウント用に作成された AWS IAM ユーザー。例えば、
arn:aws:iam::123456789001:user/abc1-b-self1234
。Snowflakeは、Snowflakeアカウント全体用に単一の IAM ユーザーをプロビジョニングします。アカウント内のすべてのS3ストレージ統合では、その IAM ユーザーが使用されます。STORAGE_AWS_EXTERNAL_ID
Snowflakeが AWS との信頼関係を確立するために使用する外部 ID。ストレージ統合の作成時に外部 ID (
STORAGE_AWS_EXTERNAL_ID
)を指定しなかった場合、Snowflakeは ID を生成して使用します。これらの値は次のセクションで提供します。
ステップ5:バケットオブジェクトにアクセスするために IAM ユーザー権限を付与する¶
次の手順では、S3バケットを使用してデータをロードおよびアンロードできるように、 AWS 管理コンソールでSnowflakeの IAM アクセス許可を構成する方法を説明します。
AWS 管理コンソールにログインします。
Identity & Access Management (IAM)を選択します。
左側のナビゲーションペインから Roles を選択します。
ステップ2: IAM AWS ロールを作成する (このトピック内)で作成したロールを選択します。
Trust relationships タブを選択します。
Edit trust relationship を選択します。
ステップ4:Snowflakeアカウントの AWS IAM ユーザーを取得する (このトピック内)で記録した DESC STORAGE INTEGRATION 出力値でポリシードキュメントを変更します。
IAM ロールのポリシードキュメント
{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": { "AWS": "<snowflake_user_arn>" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "sts:ExternalId": "<snowflake_external_id>" } } } ] }
条件:
snowflake_user_arn
は、記録した STORAGE_AWS_IAM_USER_ARN 値です。snowflake_external_id
は、記録した STORAGE_AWS_EXTERNAL_ID 値です。この例では、
snowflake_external_id
の値はMYACCOUNT_SFCRole=2_a123456/s0aBCDEfGHIJklmNoPq=
です。注釈
セキュリティ上の理由から、外部 ID を指定せずに新しいストレージ統合を作成した場合(または CREATE OR REPLACE STORAGE INTEGRATION 構文を使用して既存のストレージ統合を再作成した場合)、新しい統合には 異なる 外部 ID が含まれるため、信頼ポリシーを更新しない限り信頼関係を解決できません。
Update Trust Policy ボタンを選択します。変更が保存されます。
注釈
Snowflakeは、一時的な認証情報を一定期間キャッシュしますが、60分の有効期限を越えることができません。Snowflakeからのアクセスを取り消すと、ユーザーはキャッシュの有効期限が切れるまでファイルを一覧表示し、クラウドストレージの場所からデータを読み込むことができる場合があります。
正しいオプションの決定¶
先に進む前に、データファイルが置かれているS3バケットのターゲットパス(または AWS 用語では「プレフィックス」)にS3イベント通知が存在するかどうかを確認します。AWS ルールでは、同じパスに対して競合する通知を作成することを禁止しています。
Amazon SQS を使用してSnowpipeを自動化するための次のオプションがサポートされています。
オプション1。新しいS3イベント通知: S3バケットのターゲットパスのイベント通知を作成します。イベント通知は、ファイルをロードする準備ができると、 SQS キューを介してSnowpipeに通知します。
重要
S3バケットに競合するイベント通知が存在する場合は、代わりにオプション2を使用します。
オプション2。既存のイベント通知: Amazon Simple Notification Service(SNS) をブロードキャスターとして構成し、Snowpipe自動化用のSnowflake SQS キューを含む、複数のエンドポイント(または「サブスクライバー」。例: SQS キューまたは AWS Lambdaワークロード)に通知を共有するようにします。SNS によってパブリッシュされたS3イベント通知は、ファイルのロードする準備が整ったときに SQS キューを介してSnowpipeに通知します。
注釈
ステージ、パイプ、ロード履歴の複製 を使用する場合は、このオプションを推奨します。複製グループまたはフェールオーバーグループを作成した後に、オプション1からオプション2に移行することもできます。詳細については、 Amazon Simple Notification Service(SNS)への移行 をご参照ください。
オプション3。Snowpipeを自動化するためのAmazon EventBridge の設定: オプション2と同様に、S3バケットに対して Amazon EventBridge を有効にして、 SNS トピックに通知を送信するルールを作成することもできます。
オプション1: Snowpipeを自動化するために、新しいS3イベント通知を作成する¶
このセクションでは、S3バケットの Amazon SQS (Simple Queue Service) 通知を使用して、Snowpipeデータのロードを自動的にトリガーするための最も一般的なオプションについて説明します。手順では、データファイルが保存されているS3バケットのターゲットパス(または AWS の用語では「プレフィックス」)のイベント通知を作成する方法を説明します。
重要
S3バケットに対して競合するイベント通知が存在する場合は、代わりに オプション2: SQS 通知を使用してSnowpipeを自動化するために、Amazon SNS を構成する (このトピック内)を使用します。AWS ルールでは、同じターゲットパスに対して競合する通知の作成を禁じています。
次の図は、Snowpipeの自動インジェストプロセスフローを示しています。
データファイルはステージにロードされます。
S3イベント通知は、SQS キューを介してSnowpipeにファイルをロードする準備ができたことを通知します。Snowpipeはファイルをキューにコピーします。
Snowflakeが提供する仮想ウェアハウスは、指定されたパイプで定義されたパラメーターに基づいて、キューに入れられたファイルからターゲットテーブルにデータをロードします。
注釈
このトピックの手順では、データがロードされるSnowflakeデータベースにターゲットテーブルが既に存在することを前提としています。
ステップ1: ステージを作成する(必要な場合)¶
CREATE STAGE コマンドを使用して、S3バケットを参照する外部ステージを作成します。Snowpipeはステージからデータファイルをフェッチし、ターゲットテーブルにロードする前に一時的にキューに入れます。または、既存の外部ステージを使用できます。
注釈
クラウドストレージの場所への安全なアクセスを構成するには、 クラウドストレージへの安全なアクセスの構成 (このトピック内)をご参照ください。
CREATE STAGE ステートメントでストレージ統合を参照するには、ロールにストレージ統合オブジェクトに対する USAGE 権限が必要です。
次の例では、ユーザーセッションのアクティブスキーマに mystage
という名前のステージを作成します。クラウドストレージ URL にはパス files
が含まれています。ステージは my_storage_int
という名前のストレージ統合を参照します。
USE SCHEMA snowpipe_db.public; CREATE STAGE mystage URL = 's3://mybucket/load/files' STORAGE_INTEGRATION = my_storage_int;
ステップ2: 自動インジェストを有効にしたパイプを作成する¶
CREATE PIPE コマンドを使用してパイプを作成します。パイプは、Snowpipeがインジェスションキューからターゲットテーブルにデータをロードするために使用する COPY INTO <テーブル> ステートメントを定義します。
次の例では、ユーザーセッションのアクティブスキーマに mypipe
という名前のパイプを作成します。パイプは、 mystage
ステージでステージングされたファイルから mytable
テーブルにデータをロードします。
create pipe snowpipe_db.public.mypipe auto_ingest=true as copy into snowpipe_db.public.mytable from @snowpipe_db.public.mystage file_format = (type = 'JSON');
AUTO_INGEST=true
パラメーターは、新しいデータをロードする準備ができたときに、S3バケットから SQS キューに送信されたイベント通知を読み取ることを指定します。
重要
パイプ定義のステージ参照を既存のパイプと比較します。同じS3バケットのディレクトリパスが重複していないことを確認します。そうしないと、複数のパイプが同じデータファイルのセットを複数回、1つ以上のターゲットテーブルにロードする可能性があります。これは、たとえば、複数のステージが s3://mybucket/path1
や s3://mybucket/path1/path2
などの異なるレベルの細分性で同じS3バケットを参照する場合に発生する可能性があります。このユースケースでは、ファイルが s3://mybucket/path1/path2
にステージングされている場合、両方のステージのパイプがファイルのコピーをロードします。
これは、Snowpipeの手動設定(自動インジェスト 無効)とは異なり、ユーザーがファイルの名前付きセットを REST API に送信して、ロードするファイルをキューに入れる必要があります。自動インジェストを有効にすると、各パイプはS3イベント通知から生成されたファイルリストを受け取ります。データの重複を避けるために、追加の注意が必要です。
ステップ3: セキュリティを構成する¶
Snowpipeを使用して連続的なデータロードを実行するユーザーごとに、データロードのオブジェクト(ターゲットデータベース、スキーマ、テーブル、ステージオブジェクト、パイプ)に十分なアクセス制御権限を付与します。
注釈
「最小権限」の一般原則に従うために、パイプを使用してファイルを取り込むために使用する別個のユーザーとロールを作成することをお勧めします。ユーザーは、このロールを既定のロールとして作成する必要があります。
Snowpipeを使用するには、次の権限を持つロールが必要です。
オブジェクト |
権限 |
メモ |
---|---|---|
名前付きパイプ |
OWNERSHIP |
|
名前付きステージ |
USAGE , READ |
|
名前付きファイル形式 |
USAGE |
オプション: ステップ1:ステージを作成する(必要な場合) で作成したステージが名前付きファイル形式を参照する場合にのみ必要です。 |
ターゲットデータベース |
USAGE |
|
ターゲットスキーマ |
USAGE |
|
ターゲットテーブル |
INSERT , SELECT |
GRANT <権限> コマンドを使用して、ロールに権限を付与します。
注釈
ロールを作成して権限を付与できるのは、セキュリティ管理者(つまり、 SECURITYADMIN ロールを持つユーザー)以上、またはアカウントの CREATE ROLE 権限とグローバル MANAGE GRANTS 権限の両方を持つ別のロールのみです。
たとえば、 snowpipe_db.public
データベースオブジェクトのセットと mypipe
という名前のパイプにアクセスできる snowpipe_role
という名前のロールを作成します。次に、ロールをユーザーに付与します。
-- Create a role to contain the Snowpipe privileges use role securityadmin; create or replace role snowpipe_role; -- Grant the required privileges on the database objects grant usage on database snowpipe_db to role snowpipe_role; grant usage on schema snowpipe_db.public to role snowpipe_role; grant insert, select on snowpipe_db.public.mytable to role snowpipe_role; grant usage on stage snowpipe_db.public.mystage to role snowpipe_role; -- Pause the pipe for OWNERSHIP transfer alter pipe mypipe SET PIPE_EXECUTION_PAUSED = true; -- Grant the OWNERSHIP privilege on the pipe object grant ownership on pipe snowpipe_db.public.mypipe to role snowpipe_role; -- Grant the role to a user grant role snowpipe_role to user jsmith; -- Set the role as the default role for the user alter user jsmith set default_role = snowpipe_role; -- Resume the pipe alter pipe mypipe SET PIPE_EXECUTION_PAUSED = false;
ステップ4: イベント通知を構成する¶
S3バケットのイベント通知を構成して、新しいデータをロードできるようになったときにSnowpipeに通知します。自動インジェスト機能は、S3からSnowpipeにイベント通知を配信するために SQS キューに依存しています。
使いやすくするために、Snowpipe SQS キューはSnowflakeによって作成および管理されます。SHOW PIPES コマンド出力には、 SQS キューのAmazonリソースネーム(ARN)が表示されます。
SHOW PIPES コマンドを実行します。
SHOW PIPES;
notification_channel
列のステージには、 SQS キューの ARN があります。 ARN を便利な場所にコピーします。注釈
AWS ガイドラインに従って、SnowflakeはS3バケットごとに1つ以下の SQS キューを指定します。この SQS キューは、同じ AWS アカウントの複数のバケットで共有できます。 SQS キューは、S3バケットの外部ステージをターゲットテーブルに接続するすべてのパイプの通知を調整します。データファイルがバケットにアップロードされると、ステージディレクトリパスに一致するすべてのパイプが、対応するターゲットテーブルへのファイルの1回限りのロードを実行します。
Amazon S3コンソールにログインします。
Amazon S3ドキュメント に記載されている手順を使用して、S3バケットのイベント通知を構成します。次のようにフィールドに入力します。
Name :イベント通知の名前(例:
Auto-ingest Snowflake
)。Events : ObjectCreate (All) オプションを選択します。
Send to :ドロップダウンリストから SQS Queue を選択します。
SQS :ドロップダウンリストから Add SQS queue ARN を選択します。
SQS queue ARN : SHOW PIPES 出力から SQS キュー名を貼り付けます。
注釈
これらの手順は、S3バケット全体のアクティビティをモニターする単一のイベント通知を作成します。これが最も簡単なアプローチです。この通知は、S3バケットディレクトリでより詳細なレベルで構成されたすべてのパイプを処理します。Snowpipeは、パイプ定義で指定されたデータファイルのみを読み込みます。ただし、パイプ定義外のアクティビティに関する大量の通知は、Snowpipeが通知をフィルタリングしてアクションを実行するレートに悪影響を与える可能性があります。
または、上記の手順で、1つ以上のパスおよび/またはファイル拡張子(または AWS の用語では プレフィックス および サフィックス)を構成して、イベントアクティビティをフィルターします。手順については、関連する AWS ドキュメントトピック のオブジェクトキー名フィルタリング情報をご参照ください。通知でモニターする追加のパスまたはファイル拡張子ごとにこれらのステップを繰り返します。
AWS では、これらの通知 キュー構成 の数がS3バケットごとに最大100に制限されます。
また、 AWS では、同じS3バケットのキュー構成の重複(イベント通知全体)が許可されません。たとえば、既存の通知が s3://mybucket/load/path1
に構成されている場合、 s3://mybucket/load
などの上位レベルで別の通知を作成することはできません。その逆も同様です。
自動インジェストを使用したSnowpipeが構成されました。
新しいデータファイルがS3バケットに追加されると、イベント通知はSnowpipeに通知して、それらをパイプで定義されたターゲットテーブルにロードします。
ステップ5: 履歴ファイルをロードする¶
SQS 通知が構成される 前に 外部ステージに存在したデータファイルのバックログをロードするには、 履歴データのロード をご参照ください。
ステップ6: ステージングされたファイルを削除する¶
データが正常にロードされ、ファイルが不要になったら、ステージングされたファイルを削除します。手順については、 Snowpipeがデータをロードした後のステージングされたファイルの削除 をご参照ください。
オプション2: SQS 通知を使用してSnowpipeを自動化するために、Amazon SNS を構成する¶
このセクションでは、S3バケットの Amazon SQS (Simple Queue Service) 通知を使用して、Snowpipeデータのロードを自動的にトリガーする方法について説明します。この手順では、 Amazon Simple Notification Service(SNS) をブロードキャスターとして構成して、S3バケットのイベント通知を複数のサブスクライバー(例: SQS キューまたは AWS Lambdaワークロード)にパブリッシュする方法を説明します。サブスクライバーには、Snowpipe自動化用のSnowflake SQS キューを含みます。
注釈
これらの手順は、データファイルが置かれているS3バケットのターゲットパスにイベント通知が存在することを前提としています。イベント通知が存在しない場合は、次のいずれかを実行します。
その代わり、 オプション1:Snowpipeを自動化するために、新しいS3イベント通知を作成する (このトピック内)に従います。
S3バケットのイベント通知を作成し、このトピックの手順に従って進みます。詳細については、 Amazon S3のドキュメント をご参照ください。
次の図は、Amazon SNSを使用したSnowpipeの自動インジェストのプロセスフローを示しています。
データファイルはステージにロードされます。
SNS によってパブリッシュされたS3イベント通知は、ファイルがロード可能な状態であることを SQS キュー経由でSnowpipeに通知します。Snowpipeはファイルをキューにコピーします。
Snowflakeが提供する仮想ウェアハウスは、指定されたパイプで定義されたパラメーターに基づいて、キューに入れられたファイルからターゲットテーブルにデータをロードします。
注釈
この手順では、データがロードされるSnowflakeデータベースにターゲットテーブルが既に存在することを前提としています。
Snowpipeの自動インジェストは、 AWS KMSで暗号化された SNS トピックをサポートしています。詳細については、 休眠中の暗号化 をご参照ください。
前提条件:Amazon SNS トピックとサブスクリプションを作成する¶
AWS アカウントで SNS トピックを作成して、S3バケットのSnowflakeステージロケーションのすべてのメッセージを処理します。
S3イベント通知(例:他の SQS キューまたは AWS Lambdaワークロード)のターゲットの宛先をこのトピックにサブスクライブします。SNS バケットのイベント通知をトピックのすべてのサブスクライバーにパブリッシュします。
手順については、 SNS ドキュメント をご参照ください。
ステップ1:Snowflake SQS キューを SNS トピックに登録する¶
AWS 管理コンソールにログインします。
ホームダッシュボードから、 Simple Notification Service (SNS)を選択します。
左側のナビゲーションペインから Topics を選択します。
S3バケットのトピックを見つけます。トピック ARN に注意します。
Snowflakeクライアントを使用して、 SNS トピック ARNで SYSTEM$GET_AWS_SNS_IAM_POLICY システム関数をクエリします。
select system$get_aws_sns_iam_policy('<sns_topic_arn>');
この関数は、 SNS トピックをサブスクライブするSnowflake SQS キュー許可を付与する IAM ポリシーを返します。
例:
select system$get_aws_sns_iam_policy('arn:aws:sns:us-west-2:001234567890:s3_mybucket'); +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | SYSTEM$GET_AWS_SNS_IAM_POLICY('ARN:AWS:SNS:US-WEST-2:001234567890:S3_MYBUCKET') | +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | {"Version":"2012-10-17","Statement":[{"Sid":"1","Effect":"Allow","Principal":{"AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"},"Action":["sns:Subscribe"],"Resource":["arn:aws:sns:us-west-2:001234567890:s3_mybucket"]}]} | +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
AWS 管理コンソールに戻ります。左側のナビゲーションペインから Topics を選択します。
S3バケットのトピックを選択し、 Edit ボタンをクリックします。 Edit ページが開きます。
Access policy - Optional をクリックして、ページのこの領域を展開します。
SYSTEM$GET_AWS_SNS_IAM_POLICY 関数の結果から追加された IAM ポリシーを JSON ドキュメントにマージします。
例:
元の IAM ポリシー(略称):
{ "Version":"2008-10-17", "Id":"__default_policy_ID", "Statement":[ { "Sid":"__default_statement_ID", "Effect":"Allow", "Principal":{ "AWS":"*" } .. } ] }
統合された IAM ポリシー:
{ "Version":"2008-10-17", "Id":"__default_policy_ID", "Statement":[ { "Sid":"__default_statement_ID", "Effect":"Allow", "Principal":{ "AWS":"*" } .. }, { "Sid":"1", "Effect":"Allow", "Principal":{ "AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234" }, "Action":[ "sns:Subscribe" ], "Resource":[ "arn:aws:sns:us-west-2:001234567890:s3_mybucket" ] } ] }
追加のポリシー許可を追加して、S3がバケットのイベント通知を SNS トピックに公開できるようにします。
例(これらの手順全体で使用される SNS トピック ARN およびS3バケットを使用):
{ "Sid":"s3-event-notifier", "Effect":"Allow", "Principal":{ "Service":"s3.amazonaws.com" }, "Action":"SNS:Publish", "Resource":"arn:aws:sns:us-west-2:001234567890:s3_mybucket", "Condition":{ "ArnLike":{ "aws:SourceArn":"arn:aws:s3:*:*:s3_mybucket" } } }
統合された IAM ポリシー:
{ "Version":"2008-10-17", "Id":"__default_policy_ID", "Statement":[ { "Sid":"__default_statement_ID", "Effect":"Allow", "Principal":{ "AWS":"*" } .. }, { "Sid":"1", "Effect":"Allow", "Principal":{ "AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234" }, "Action":[ "sns:Subscribe" ], "Resource":[ "arn:aws:sns:us-west-2:001234567890:s3_mybucket" ] }, { "Sid":"s3-event-notifier", "Effect":"Allow", "Principal":{ "Service":"s3.amazonaws.com" }, "Action":"SNS:Publish", "Resource":"arn:aws:sns:us-west-2:001234567890:s3_mybucket", "Condition":{ "ArnLike":{ "aws:SourceArn":"arn:aws:s3:*:*:s3_mybucket" } } } ] }
Save changes ボタンをクリックします。
ステップ2: ステージを作成する(必要な場合)¶
CREATE STAGE コマンドを使用して、S3バケットを参照する外部ステージを作成します。Snowpipeはステージからデータファイルをフェッチし、ターゲットテーブルにロードする前に一時的にキューに入れます。
または、既存の外部ステージを使用できます。
注釈
クラウドストレージの場所への安全なアクセスを構成するには、 クラウドストレージへの安全なアクセスの構成 (このトピック内)をご参照ください。
次の例では、ユーザーセッションのアクティブスキーマに mystage
という名前のステージを作成します。クラウドストレージ URL にはパス files
が含まれています。ステージは my_storage_int
という名前のストレージ統合を参照します。
CREATE STAGE mystage URL = 's3://mybucket/load/files' STORAGE_INTEGRATION = my_storage_int;
ステップ3: 自動インジェストを有効にしたパイプを作成する¶
CREATE PIPE コマンドを使用してパイプを作成します。パイプは、Snowpipeがインジェスションキューからターゲットテーブルにデータをロードするために使用する COPY INTO <テーブル> ステートメントを定義します。COPY ステートメントで、 前提条件:Amazon SNS とサブスクリプションを作成する から SNS トピック ARN を識別します。
次の例では、ユーザーセッションのアクティブスキーマに mypipe
という名前のパイプを作成します。パイプは、 mystage
ステージでステージングされたファイルから mytable
テーブルにデータをロードします。
create pipe snowpipe_db.public.mypipe auto_ingest=true aws_sns_topic='<sns_topic_arn>' as copy into snowpipe_db.public.mytable from @snowpipe_db.public.mystage file_format = (type = 'JSON');
条件:
AUTO_INGEST = true
新しいデータをロードする準備ができたときに、S3バケットから SQS キューに送信されたイベント通知を読み取ることを指定します。
AWS_SNS_TOPIC = '<SNSトピックARN>'
現在の例
arn:aws:sns:us-west-2:001234567890:s3_mybucket
のように、S3バケットの SNS トピックの ARN を指定します。CREATE PIPE ステートメントは、指定された SNS トピックにSnowflake SQS キューをサブスクライブします。パイプは、 SNS トピックを介したイベント通知によってトリガーされたインジェストキューにのみファイルをコピーします。
パイプからいずれかのパラメーターを削除するには、現在、 CREATE OR REPLACE PIPE 構文を使用してパイプを再作成する必要があります。
重要
COPY INTO <テーブル> ステートメントの保存場所の参照が、アカウントにある既存のパイプの参照と重複していないことを確認します。そうしないと、複数のパイプが、同じデータファイルのセットをターゲットテーブルにロードする可能性があります。たとえば、この状況は、複数のパイプ定義が <ストレージの場所>/path1/
や <ストレージの場所>/path1/path2/
など、細分性のレベルが異なる同じストレージの場所を参照している場合に発生する可能性があります。この例では、ファイルが <ストレージの場所>/path1/path2/
でステージングされている場合は、両方のパイプがファイルのコピーをロードします。
SHOW PIPES を実行するか、Account Usageの PIPES ビューまたはInformation Schemaの PIPES ビューのいずれかをクエリして、アカウントにあるパイプの定義すべての COPY INTO <テーブル> ステートメントを表示します。
ステップ4: セキュリティを構成する¶
Snowpipeを使用して連続的なデータロードを実行するユーザーごとに、データロードのオブジェクト(ターゲットデータベース、スキーマ、テーブル、ステージオブジェクト、パイプ)に十分なアクセス制御権限を付与します。
注釈
「最小権限」の一般原則に従うために、パイプを使用してファイルを取り込むために使用する別個のユーザーとロールを作成することをお勧めします。ユーザーは、このロールを既定のロールとして作成する必要があります。
Snowpipeを使用するには、次の権限を持つロールが必要です。
オブジェクト |
権限 |
メモ |
---|---|---|
名前付きパイプ |
OWNERSHIP |
|
名前付きストレージ統合 |
USAGE |
ステップ2:ステージを作成する(必要な場合) で作成したステージがステージ統合を参照する場合にのみ必要です。 |
名前付きステージ |
USAGE , READ |
|
名前付きファイル形式 |
USAGE |
オプション: ステップ2:ステージを作成する(必要な場合) で作成したステージが名前付きファイル形式を参照する場合にのみ必要です。 |
ターゲットデータベース |
USAGE |
|
ターゲットスキーマ |
USAGE |
|
ターゲットテーブル |
INSERT , SELECT |
GRANT <権限> コマンドを使用して、ロールに権限を付与します。
注釈
セキュリティ管理者(つまり、 SECURITYADMIN ロールを持つユーザー)以上のみがロールを作成できます。
たとえば、 snowpipe_db.public
データベースオブジェクトのセットと mypipe
という名前のパイプにアクセスできる snowpipe_role
という名前のロールを作成します。次に、ロールをユーザーに付与します。
-- Create a role to contain the Snowpipe privileges use role securityadmin; create or replace role snowpipe_role; -- Grant the required privileges on the database objects grant usage on database snowpipe_db to role snowpipe_role; grant usage on schema snowpipe_db.public to role snowpipe_role; grant insert, select on snowpipe_db.public.mytable to role snowpipe_role; grant usage, read on stage snowpipe_db.public.mystage to role snowpipe_role; -- Pause the pipe for OWNERSHIP transfer alter pipe mypipe SET PIPE_EXECUTION_PAUSED = true; -- Grant the OWNERSHIP privilege on the pipe object grant ownership on pipe snowpipe_db.public.mypipe to role snowpipe_role; -- Grant the role to a user grant role snowpipe_role to user jsmith; -- Set the role as the default role for the user alter user jsmith set default_role = snowpipe_role; -- Resume the pipe alter pipe mypipe SET PIPE_EXECUTION_PAUSED = false;
自動インジェストを使用したSnowpipeが構成されました。
新しいデータファイルがS3バケットに追加されると、イベント通知はSnowpipeに通知して、それらをパイプで定義されたターゲットテーブルにロードします。
ステップ5: 履歴ファイルをロードする¶
SQS 通知が構成される 前に 外部ステージに存在したデータファイルのバックログをロードするには、 履歴データのロード をご参照ください。
ステップ6: ステージングされたファイルを削除する¶
データが正常にロードされ、ファイルが不要になったら、ステージングされたファイルを削除します。手順については、 Snowpipeがデータをロードした後のステージングされたファイルの削除 をご参照ください。
オプション3: Snowpipeを自動化するためにAmazon EventBridge を設定する¶
オプション2と同様に、Amazon EventBridge を設定してSnowpipeを自動化することもできます。
ステップ1: Amazon SNS トピックを作成する¶
前提条件: Amazon SNS トピックとサブスクリプションを作成する (このトピック内)に従います。
ステップ2: EventBridge ルールを作成してS3バケットをサブスクライブし、 SNS トピックに通知を送信します¶
S3バケットの Amazon EventBridge を有効にします。
EventBridge ルールを作成し、ステップ1で作成した SNS トピックに 通知を送信 します。
ステップ3: SQS 通知を使用してSnowpipeを自動化するためにAmazon SNS を構成する¶
オプション2: SQS 通知を使用してSnowpipeを自動化するために、Amazon SNS を構成する (このトピック内)に従います。
SYSTEM$PIPE_STATUS 出力¶
SYSTEM$PIPE_STATUS 関数は、パイプの現在のステータスの JSON 表現を取得します。
TRUEに設定された AUTO_INGEST のパイプの場合、関数は次の名前/値のペアを含む JSON オブジェクトを返します(現在のパイプステータスに該当する場合)。
{"executionState":"<値>","oldestFileTimestamp":<値>,"pendingFileCount":<値>,"notificationChannelName":"<値>","numOutstandingMessagesOnChannel":<値値>,"lastReceivedMessageTimestamp":"<値>","lastForwardedMessageTimestamp":"<値>","error":<値>,"fault":<値>}
出力値の説明については、 SQL 関数の参照トピックをご参照ください。