- カテゴリ:
CREATE PIPE¶
インジェスチョンキューからテーブルにデータをロードするために Snowpipe が使用する COPY INTO <テーブル> ステートメントを定義するために、システムに新しいパイプを作成します。
- こちらもご参照ください。
構文¶
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
[ AUTO_INGEST = [ TRUE | FALSE ] ]
[ ERROR_INTEGRATION = <integration_name> ]
[ AWS_SNS_TOPIC = '<string>' ]
[ INTEGRATION = '<string>' ]
[ COMMENT = '<string_literal>' ]
AS <copy_statement>
必須パラメーター¶
名前
パイプの識別子。パイプが作成されるスキーマに対して一意である必要があります。
識別子はアルファベットで始まる必要があり、識別子文字列全体が二重引用符で囲まれていない限り、スペースや特殊文字を含めることはできません(例:
"My object"
)。二重引用符で囲まれた識別子も大文字と小文字が区別されます。詳細については、 識別子の要件 をご参照ください。
ステートメントのコピー
キューファイルからSnowflakeテーブルにデータをロードするために使用される COPY INTO <テーブル> ステートメント。このステートメントは、パイプのテキスト/定義として機能し、 SHOW PIPES 出力に表示されます。
オプションのパラメーター¶
AUTO_INGEST = TRUE | FALSE
構成されたメッセージサービスからイベント通知を受信したときに、指定した外部ステージおよびオプションのパスからデータファイルを自動的にロードするかどうかを指定します。
TRUE
は、自動データロードを有効にします。Snowpipeは、外部ステージ(Amazon S3、Google Cloud Storage、またはMicrosoft Azure)からのロードをサポートしています。
FALSE
は、自動データロードを無効にします。データファイルをロードするには、Snowpipe REST API エンドポイントを呼び出す必要があります。Snowpipeは、内部ステージ(つまり、Snowflakeの名前付きステージまたはテーブルステージであるが、ユーザーステージ ではない )、または外部ステージ(Amazon S3、Google Cloud Storage、またはMicrosoft Azure)からのロードをサポートします。
ERROR_INTEGRATION = '統合名'
Amazon Simple Notification Service(SNS)を使用して、エラー通知を送信するようにSnowpipeを設定する場合にのみ必要です。 Amazon SNS との通信に使用される通知統合の名前を指定します。詳細については、 Snowpipeのエラー通知の有効化 をご参照ください。
AWS_SNS_TOPIC = '文字列'
SNS を使用してAmazon S3ステージの AUTO_INGEST を設定する場合にのみ必要です。 S3バケットの SNS トピックのAmazonリソースネーム(ARN)を指定します。 CREATE PIPE ステートメントは、指定された SNS トピックにAmazon Simple Queue Service(SQS)キューをサブスクライブします。パイプは、 SNS トピックを介したイベント通知によってトリガーされた取り込みキューにファイルをコピーします。詳細については、 Amazon S3用Snowpipeの自動化 をご参照ください。
INTEGRATION = '文字列'
Microsoft Azureステージの AUTO_INGEST を構成する場合のみに必要です。 Azureストレージキューへのアクセスに使用される既存の通知統合を指定します。詳細については、次をご参照ください。
統合名はすべて大文字で入力する必要があります。
COMMENT = '文字列リテラル'
パイプのコメントを指定します。
デフォルト:値なし
使用上の注意¶
この SQL コマンドには、次の最小権限が必要です。
権限
オブジェクト
メモ
CREATE PIPE
スキーマ
USAGE
パイプ定義のステージ
外部ステージのみ
READ
パイプ定義のステージ
内部ステージのみ
SELECT, INSERT
パイプ定義にあるテーブル
スキーマオブジェクトでの SQL 操作には、オブジェクトを含むデータベースとスキーマに対する USAGE 権限も必要です。
次を 除く すべての COPY INTO <テーブル> コピーオプションがサポートされています。
FILES = ( 'ファイル名1' [ , 'ファイル名2', ... ] )
ON_ERROR = ABORT_STATEMENT
SIZE_LIMIT = 数値
PURGE = TRUE | FALSE
(つまり、ロード中の自動パージ)FORCE = TRUE | FALSE
REMOVE コマンドを使用して、内部(つまりSnowflake)ステージからファイルを(ロード後に)手動で削除できます。
RETURN_FAILED_ONLY = TRUE | FALSE
VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS
PATTERN = '正規表現パターン'
コピーオプションのサポートは、 プレビュー機能 として提供されます。コピーオプションは、正規表現を使用してロードするファイルのセットをフィルターします。パターンマッチングは、 AUTO_INGEST パラメーター値に応じて次のように動作します。AUTO_INGEST = TRUE
:正規表現は、ステージ内のファイルのリスト、および COPY INTO <テーブル> ステートメント内のオプションのパス(つまり、クラウドストレージの場所)をフィルターします。:AUTO_INGEST = FALSE
: 正規表現は、Snowpipe REST APIinsertFiles
エンドポイントへの呼び出しで送信されたファイルのリストをフィルターします。
Snowpipeは、ステージ定義内のパスセグメントを保存場所からトリムし、残りのパスセグメントとファイル名に正規表現を適用することに注意してください。ステージ定義を表示するには、ステージに対して DESCRIBE STAGE コマンドを実行します。URL プロパティは、バケット名またはコンテナー名と0個以上のパスセグメントで構成されます。たとえば、 COPY INTO <テーブル> ステートメントの FROM の場所が
@s/path1/path2/
で、ステージ@s
の URL の値がs3://mybucket/path1/
の場合、Snowpipeは保存場所から/path1/
をトリムします。 FROM 句で、正規表現をpath2/
とパス内のファイル名に適用します。列の並べ替え、列の省略、およびキャスト(つまり、ロード中のデータの変換)の COPY ステートメントのソースとしてクエリを使用することはサポートされています。使用例については、 ロード中のデータの変換 をご参照ください。単純な SELECT ステートメントのみがサポートされています。 WHERE 句を使用したフィルタリングはサポートされていません。
パイプ定義は動的ではありません(つまり、ステージ/テーブルの名前変更やドロップなど、基になるステージまたはテーブルが変更された場合、パイプは自動的に更新されません)。代わりに、新しいパイプを作成し、今後のSnowpipe REST API 呼び出しでこのパイプ名を送信する必要があります。
メタデータについて。
注意
Snowflakeサービスを使用する場合、お客様は、個人データ(ユーザーオブジェクト向け以外)、機密データ、輸出管理データ、またはその他の規制されたデータがメタデータとして入力されていないことを確認する必要があります。詳細については、 Snowflakeのメタデータフィールド をご参照ください。
重要
パイプを再作成(CREATE OR REPLACE PIPE 構文を使用)する場合は、関連する考慮事項とベストプラクティスについて、 パイプの再作成 をご参照ください。
例¶
mystage
ステージにステージングされたファイルから mytable
にすべてのデータをロードするパイプを現在のスキーマに作成します。
create pipe mypipe as copy into mytable from @mystage;
前の例と同じですが、データ変換があります。ステージングされたファイルの4番目と5番目の列からのデータのみを逆順でロードします。
create pipe mypipe2 as copy into mytable(C1, C2) from (select $5, $4 from @mystage);
メッセージングサービスから受信したイベント通知を使用して、データの自動ロード用として現在のスキーマにパイプを作成します。
Amazon S3
create pipe mypipe_s3 auto_ingest = true aws_sns_topic = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket' as copy into snowpipe_db.public.mytable from @snowpipe_db.public.mystage file_format = (type = 'JSON');Google Cloud Storage
create pipe mypipe_gcs auto_ingest = true integration = 'MYINT' as copy into snowpipe_db.public.mytable from @snowpipe_db.public.mystage file_format = (type = 'JSON');Microsoft Azure
create pipe mypipe_azure auto_ingest = true integration = 'MYINT' as copy into snowpipe_db.public.mytable from @snowpipe_db.public.mystage file_format = (type = 'JSON');