CREATE PIPE¶
インジェスチョンキューからテーブルにデータをロードするために Snowpipe が使用する COPY INTO <テーブル> ステートメントを定義するために、システムに新しいパイプを作成します。
- こちらもご参照ください。
構文¶
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
[ AUTO_INGEST = [ TRUE | FALSE ] ]
[ ERROR_INTEGRATION = <integration_name> ]
[ AWS_SNS_TOPIC = '<string>' ]
[ INTEGRATION = '<string>' ]
[ COMMENT = '<string_literal>' ]
AS <copy_statement>
必須パラメーター¶
name
パイプの識別子。パイプが作成されるスキーマに対して一意である必要があります。
識別子はアルファベットで始まる必要があり、識別子文字列全体が二重引用符で囲まれていない限り、スペースや特殊文字を含めることはできません(例:
"My object"
)。二重引用符で囲まれた識別子も大文字と小文字が区別されます。詳細については、 識別子の要件 をご参照ください。
copy_statement
キューファイルからSnowflakeテーブルにデータをロードするために使用される COPY INTO <テーブル> ステートメント。このステートメントは、パイプのテキスト/定義として機能し、 SHOW PIPES 出力に表示されます。
注釈
現在、Snowpipeの copy_statement
で次の関数を使用することはお勧めしません。
CURRENT_DATE
CURRENT_TIME
CURRENT_TIMESTAMP
GETDATE
LOCALTIME
LOCALTIMESTAMP
SYSDATE
SYSTIMESTAMP
これらの関数を使用して挿入された時間値は、 COPY_HISTORY 関数 または COPY_HISTORY ビュー によって返される LOAD_TIME 値よりも数時間早くなる可能性があるという既知の問題があります。
代わりに METADATA$START_SCAN_TIME をクエリすることをお勧めします。これにより、記録のロードがより正確に表現されます。
オプションのパラメーター¶
AUTO_INGEST = TRUE | FALSE
内部ステージと外部ステージのどちらからデータファイルを自動的に読み込むかを指定します。
TRUE
は、自動データロードを有効にします。Snowpipeは、外部ステージ(Amazon S3、Google Cloud Storage、またはMicrosoft Azure)からのロードをサポートしています。
FALSE
は、自動データロードを無効にします。データファイルをロードするには、Snowpipe REST API エンドポイントを呼び出す必要があります。Snowpipeは、内部ステージ(つまり、Snowflakeの名前付きステージまたはテーブルステージであるが、ユーザーステージ ではない )、または外部ステージ(Amazon S3、Google Cloud Storage、またはMicrosoft Azure)からのロードをサポートします。
ERROR_INTEGRATION = 'integration_name'
Snowpipeを構成して、クラウドメッセージングサービスにエラー通知を送信する場合にのみ必要です。
メッセージングサービスとの通信に使用する通知統合の名前を指定します。詳細については、 Snowpipeのエラー通知 をご参照ください。
AWS_SNS_TOPIC = 'string'
SNS を使用してAmazon S3外部ステージ用に AUTO_INGEST を構成する場合にのみ必要です。
S3バケットの SNS トピックのAmazonリソースネーム(ARN)を指定します。 CREATE PIPE ステートメントは、指定された SNS トピックにAmazon Simple Queue Service(SQS)キューをサブスクライブします。パイプは、 SNS トピックを介したイベント通知によってトリガーされた取り込みキューにファイルをコピーします。詳細については、 Amazon S3用Snowpipeの自動化 をご参照ください。
INTEGRATION = 'string'
Google Cloud StorageまたはMicrosoft Azure外部ステージ用に AUTO_INGEST を構成する場合にのみ必要です。
ストレージキューへのアクセスに使用する既存の通知統合を指定します。詳細については、次をご参照ください。
統合名はすべて大文字で入力する必要があります。
COMMENT = 'string_literal'
パイプのコメントを指定します。
デフォルト:値なし
使用上の注意¶
この SQL コマンドには、次の最小権限が必要です。
権限
オブジェクト
注意
CREATE PIPE
スキーマ
USAGE
パイプ定義のステージ
外部ステージのみ
USAGE
統合
Snowpipeのエラー通知を受信するために必要です。
READ
パイプ定義のステージ
内部ステージのみ
SELECT, INSERT
パイプ定義にあるテーブル
スキーマオブジェクトでの SQL 操作には、オブジェクトを含むデータベースとスキーマに対する USAGE 権限も必要です。
次を 除く、すべての COPY INTO <テーブル> コピーオプションがサポートされています。
FILES = ( 'file_name1' [ , 'file_name2', ... ] )
ON_ERROR = ABORT_STATEMENT
SIZE_LIMIT = num
PURGE = TRUE | FALSE
(つまり、ロード中の自動パージ)FORCE = TRUE | FALSE
REMOVE コマンドを使用して、内部(つまりSnowflake)ステージからファイルを(ロード後に)手動で削除できます。
RETURN_FAILED_ONLY = TRUE | FALSE
VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS
PATTERN = 'regex_pattern'
コピーオプションは、正規表現を使用してロードするファイルのセットをフィルターします。パターンマッチングは、 AUTO_INGEST パラメーター値に応じて次のように動作します。AUTO_INGEST = TRUE
:正規表現は、ステージ内のファイルのリスト、および COPY INTO <テーブル> ステートメント内のオプションのパス(つまり、クラウドストレージの場所)をフィルターします。:AUTO_INGEST = FALSE
: 正規表現は、Snowpipe REST APIinsertFiles
エンドポイントへの呼び出しで送信されたファイルのリストをフィルターします。
Snowpipeは、ステージ定義内のパスセグメントを保存場所からトリムし、残りのパスセグメントとファイル名に正規表現を適用することに注意してください。ステージ定義を表示するには、ステージに対して DESCRIBE STAGE コマンドを実行します。URL プロパティは、バケット名またはコンテナー名と0個以上のパスセグメントで構成されます。たとえば、 COPY INTO <テーブル> ステートメントの FROM の場所が
@s/path1/path2/
で、ステージ@s
の URL の値がs3://mybucket/path1/
の場合、Snowpipeは保存場所から/path1/
をトリムします。 FROM 句で、正規表現をpath2/
とパス内のファイル名に適用します。重要
Snowflakeは、Snowpipe用のクラウドイベントフィルタリングを有効にして、コスト、イベントノイズ、および遅延を削減するようにお勧めします。PATTERN オプションは、クラウドプロバイダーのイベントフィルタリング機能では不十分な場合にのみ使用してください。各クラウドプロバイダーのイベントフィルタリングの構成に関する詳細については、次のページをご参照ください。
Amazon S3: オブジェクトキー名のフィルタリングを使用したイベント通知の構成
Microsoft Azure Event Grid: Event Gridサブスクリプションのイベントフィルタリングについて
Google Cloud Pub/Sub: メッセージのフィルタリング
列の並べ替え、列の省略、およびキャスト(つまり、ロード中のデータの変換)の COPY ステートメントのソースとしてクエリを使用することはサポートされています。使用例については、 ロード中のデータの変換 をご参照ください。単純な SELECT ステートメントのみがサポートされています。 WHERE 句を使用したフィルタリングはサポートされていません。
パイプ定義は動的ではありません(つまり、ステージ/テーブルの名前変更やドロップなど、基になるステージまたはテーブルが変更された場合、パイプは自動的に更新されません)。代わりに、新しいパイプを作成し、今後のSnowpipe REST API 呼び出しでこのパイプ名を送信する必要があります。
メタデータについて。
注意
Snowflakeサービスを使用する場合、お客様は、個人データ(ユーザーオブジェクト向け以外)、機密データ、輸出管理データ、またはその他の規制されたデータがメタデータとして入力されていないことを確認する必要があります。詳細については、 Snowflakeのメタデータフィールド をご参照ください。
OR REPLACE
とIF NOT EXISTS
句は互いに排他的です。両方を同じステートメントで使うことはできません。CREATE OR REPLACE <オブジェクト> ステートメントはアトミックです。つまり、オブジェクトが置き換えられると、単一のトランザクションで、古いオブジェクトが削除されて新しいオブジェクトが作成されます。
重要
パイプを再作成(CREATE OR REPLACE PIPE 構文を使用)する場合は、関連する考慮事項とベストプラクティスについて、 パイプの再作成 をご参照ください。
例¶
mystage
ステージにステージングされたファイルから mytable
にすべてのデータをロードするパイプを現在のスキーマに作成します。
CREATE PIPE mypipe
AS
COPY INTO mytable
FROM @mystage
FILE_FORMAT = (TYPE = 'JSON');
前の例と同じですが、データ変換があります。ステージングされたファイルの4番目と5番目の列からのデータのみを逆順でロードします。
CREATE PIPE mypipe2
AS
COPY INTO mytable(C1, C2)
FROM (SELECT $5, $4 FROM @mystage)
FILE_FORMAT = (TYPE = 'JSON');
データで表される対応する列に一致するターゲットテーブルの列にすべてのデータをロードするパイプを作成します。列名は大文字と小文字を区別しません。
さらに、 METADATA$START_SCAN_TIME および METADATA$FILENAME メタデータ列 から、 c1
および c2
という列にメタデータをロードします。
CREATE PIPE mypipe3
AS
(COPY INTO mytable
FROM @mystage
MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
INCLUDE_METADATA = (c1= METADATA$START_SCAN_TIME, c2=METADATA$FILENAME)
FILE_FORMAT = (TYPE = 'JSON'));
メッセージングサービスから受信したイベント通知を使用して、データの自動ロード用として現在のスキーマにパイプを作成します。
Amazon S3
CREATE PIPE mypipe_s3
AUTO_INGEST = TRUE
AWS_SNS_TOPIC = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket'
AS
COPY INTO snowpipe_db.public.mytable
FROM @snowpipe_db.public.mystage
FILE_FORMAT = (TYPE = 'JSON');
Google Cloud Storage
CREATE PIPE mypipe_gcs
AUTO_INGEST = TRUE
INTEGRATION = 'MYINT'
AS
COPY INTO snowpipe_db.public.mytable
FROM @snowpipe_db.public.mystage
FILE_FORMAT = (TYPE = 'JSON');
Microsoft Azure
CREATE PIPE mypipe_azure
AUTO_INGEST = TRUE
INTEGRATION = 'MYINT'
AS
COPY INTO snowpipe_db.public.mytable
FROM @snowpipe_db.public.mystage
FILE_FORMAT = (TYPE = 'JSON');
名前付き内部ステージ
現在のスキーマで、 mystage
という名前の内部ステージ上のすべてのデータファイルを自動的に読み込むパイプを作成します。
CREATE PIPE mypipe_aws AUTO_INGEST = TRUE AS COPY INTO snowpipe_db.public.mytable FROM @snowpipe_db.public.mystage FILE_FORMAT = (TYPE = 'JSON');