CREATE PIPE¶
Snowpipe で使用される:doc:/sql-reference/sql/copy-into-table`ステートメントを定義するために、システム内に新しいパイプを作成してインジェスションキューからデータをロードします。または :doc:`高性能アーキテクチャのSnowpipe Streaming </user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview> により、ストリーミングソースから直接テーブルにデータをロードします。
- こちらもご参照ください。
構文¶
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
[ AUTO_INGEST = [ TRUE | FALSE ] ]
[ ERROR_INTEGRATION = <integration_name> ]
[ AWS_SNS_TOPIC = '<string>' ]
[ INTEGRATION = '<string>' ]
[ COMMENT = '<string_literal>' ]
AS <copy_statement>
注釈
<copy_statement> を2つの異なるタイプのデータソースと一緒に使用することができます。
ステージングされた場所:
COPY INTO mytable FROM @mystage ...ストリーミングソース:
COPY INTO mytable FROM (SELECT ... FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')))
必須パラメーター¶
nameパイプの識別子。パイプが作成されるスキーマに対して一意である必要があります。
識別子はアルファベットで始まる必要があり、識別子文字列全体が二重引用符で囲まれていない限り、スペースや特殊文字を含めることはできません(例:
"My object")。二重引用符で囲まれた識別子も大文字と小文字が区別されます。詳細については、 識別子の要件 をご参照ください。
copy_statementキューファイルからSnowflakeテーブルにデータをロードするために使用される COPY INTO <テーブル> ステートメント。このステートメントは、パイプのテキスト/定義として機能し、 SHOW PIPES 出力に表示されます。
注釈
現在、Snowpipeの copy_statement で次の関数を使用することはお勧めしません。
CURRENT_DATE
CURRENT_TIME
CURRENT_TIMESTAMP
GETDATE
LOCALTIME
LOCALTIMESTAMP
SYSDATE
SYSTIMESTAMP
これらの関数を使用して挿入された時間値は、 COPY_HISTORY 関数 または COPY_HISTORY ビュー によって返される LOAD_TIME 値よりも数時間早くなる可能性があるという既知の問題があります。
代わりに METADATA$START_SCAN_TIME をクエリすることをお勧めします。これにより、記録のロードがより正確に表現されます。
オプションのパラメーター¶
AUTO_INGEST = TRUE | FALSE内部ステージと外部ステージのどちらからデータファイルを自動的に読み込むかを指定します。
TRUEは、自動データロードを有効にします。Snowpipeは、外部ステージ(Amazon S3、Google Cloud Storage、またはMicrosoft Azure)からのロードをサポートしています。
FALSEは、自動データロードを無効にします。データファイルをロードするには、Snowpipe REST API エンドポイントを呼び出す必要があります。Snowpipeは、内部ステージ(つまり、Snowflakeの名前付きステージまたはテーブルステージであるが、ユーザーステージ ではない )、または外部ステージ(Amazon S3、Google Cloud Storage、またはMicrosoft Azure)からのロードをサポートします。
ERROR_INTEGRATION = 'integration_name'Snowpipeを構成して、クラウドメッセージングサービスにエラー通知を送信する場合にのみ必要です。
メッセージングサービスとの通信に使用する通知統合の名前を指定します。詳細については、 Snowpipeのエラー通知 をご参照ください。
AWS_SNS_TOPIC = 'string'SNS を使用してAmazon S3外部ステージ用に AUTO_INGEST を構成する場合にのみ必要です。
S3バケットの SNS トピックのAmazonリソースネーム(ARN)を指定します。CREATE PIPE ステートメントは、指定された SNS トピックにAmazon Simple Queue Service(SQS)キューをサブスクライブします。パイプは、 SNS トピックを介したイベント通知によってトリガーされた取り込みキューにファイルをコピーします。詳細については、 Amazon S3用Snowpipeの自動化 をご参照ください。
INTEGRATION = 'string'Google Cloud StorageまたはMicrosoft Azure外部ステージ用に AUTO_INGEST を構成する場合にのみ必要です。
ストレージキューへのアクセスに使用する既存の通知統合を指定します。詳細については、次をご参照ください。
統合名はすべて大文字で入力する必要があります。
COMMENT = 'string_literal'パイプのコメントを指定します。
デフォルト:値なし
高性能アーキテクチャのSnowpipe Streaming用パイプ¶
Snowpipe Streaming用のパイプを定義して、ステージングされたファイルの場所を必要とせず、Snowpipe Streaming API から直接データをロードすることができます。このメソッドは、低レイテンシ、行ベースのインジェスションのために設計されています。
ストリーミングパイプの COPY INTO ステートメントは、FROM 句にある DATA_SOURCE のテーブル関数を TYPE=> 'STREAMING' 引数と一緒に使用する必要があります。
注釈
Pipes created for streaming don't require an
AUTO_INGESTparameter or aFROM @stageclause.ストリーミングパイプの定義内の
copy_statementは、API から受信したデータを変換し、ロードするために使用されます。Snowpipe Streamingは各テーブルの デフォルトのパイプ を提供します。これはオンデマンドで自動的に作成されます。カスタムパイプを作成する必要があるのは、処理中の変換や事前クラスタリングなどの機能が必要な場合のみです。
例については、 例 をご参照ください。
使用上の注意¶
この SQL コマンドには、次の最小権限が必要です。
権限
オブジェクト
注意
CREATE PIPE
スキーマ
USAGE
パイプ定義のステージ
外部ステージのみ
USAGE
統合
Snowpipeのエラー通知を受信するために必要です。
READ
パイプ定義のステージ
内部ステージのみ
SELECT, INSERT
パイプ定義にあるテーブル
スキーマオブジェクトでの SQL 操作には、オブジェクトを含むデータベースとスキーマに対する USAGE 権限も必要です。
次を 除く、すべての COPY INTO <テーブル> コピーオプションがサポートされています。
FILES = ( 'file_name1' [ , 'file_name2', ... ] )ON_ERROR = ABORT_STATEMENTSIZE_LIMIT = numPURGE = TRUE | FALSE(つまり、ロード中の自動パージ)FORCE = TRUE | FALSEREMOVE コマンドを使用して、内部(つまりSnowflake)ステージからファイルを(ロード後に)手動で削除できます。
RETURN_FAILED_ONLY = TRUE | FALSEVALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS
PATTERN = 'regex_pattern'コピーオプションは、正規表現を使用してロードするファイルのセットをフィルターします。パターンマッチングは、 AUTO_INGEST パラメーター値に応じて次のように動作します。AUTO_INGEST = TRUE:正規表現は、ステージ内のファイルのリスト、および COPY INTO <テーブル> ステートメント内のオプションのパス(つまり、クラウドストレージの場所)をフィルターします。:AUTO_INGEST = FALSE: 正規表現は、Snowpipe REST APIinsertFilesエンドポイントへの呼び出しで送信されたファイルのリストをフィルターします。
Snowpipeは、ステージ定義内のパスセグメントを保存場所からトリムし、残りのパスセグメントとファイル名に正規表現を適用します。ステージ定義を表示するには、ステージに対して DESCRIBE STAGE コマンドを実行します。URL プロパティは、バケット名またはコンテナー名と0個以上のパスセグメントで構成されます。たとえば、 COPY INTO <table> ステートメントの FROM の場所が``@s/path1/path2/
で、ステージ``@s``のURL値が``s3://mybucket/path1/``である場合、Snowpipeは FROM 句のストレージの場所から``s3://mybucket/path1/path2/を削除し、残りのパス内のファイル名に正規表現を適用します。>* statement is ```` and the URL value for stage ```` is ````, then Snowpipe trims ``重要
Snowflakeは、Snowpipe用のクラウドイベントフィルタリングを有効にして、コスト、イベントノイズ、および遅延を削減するようにお勧めします。PATTERN オプションは、クラウドプロバイダーのイベントフィルタリング機能では不十分な場合にのみ使用してください。各クラウドプロバイダーのイベントフィルタリングの構成に関する詳細については、次のページをご参照ください。
Amazon S3: オブジェクトキー名のフィルタリングを使用したイベント通知の構成
Microsoft Azure Event Grid: Event Gridサブスクリプションのイベントフィルタリングについて
Google Cloud Pub/Sub: メッセージのフィルタリング
列の並べ替え、列の省略、およびキャスト(つまり、ロード中のデータの変換)の COPY ステートメントのソースとしてクエリを使用することはサポートされています。使用例については、 ロード中のデータの変換 をご参照ください。単純な SELECT ステートメントのみがサポートされています。WHERE 句を使用したフィルタリングはサポートされていません。
パイプ定義は動的ではありません(つまり、ステージ/テーブルの名前変更やドロップなど、基になるステージまたはテーブルが変更された場合、パイプは自動的に更新されません)。代わりに、新しいパイプを作成し、今後のSnowpipe REST API 呼び出しでこのパイプ名を送信する必要があります。
メタデータについて。
注意
Snowflakeサービスを使用する場合、お客様は、個人データ(ユーザーオブジェクト向け以外)、機密データ、輸出管理データ、またはその他の規制されたデータがメタデータとして入力されていないことを確認する必要があります。詳細については、 Snowflakeのメタデータフィールド をご参照ください。
OR REPLACE 句と IF NOT EXISTS 句は互いに排他的です。この2つを同じステートメントで使うことはできません。
CREATE OR REPLACE <オブジェクト> ステートメントはアトミックです。つまり、オブジェクトが置き換えられると、単一のトランザクションで、古いオブジェクトが削除されて新しいオブジェクトが作成されます。
重要
パイプを再作成(CREATE OR REPLACE PIPE 構文を使用)する場合は、関連する考慮事項とベストプラクティスについて、 パイプの再作成 をご参照ください。
例¶
mystage ステージにステージングされたファイルから mytable にすべてのデータをロードするパイプを現在のスキーマに作成します。
CREATE PIPE mypipe
AS
COPY INTO mytable
FROM @mystage
FILE_FORMAT = (TYPE = 'JSON');
前の例と同じですが、データ変換があります。ステージングされたファイルの4番目と5番目の列からのデータのみを逆順でロードします。
CREATE PIPE mypipe2
AS
COPY INTO mytable(C1, C2)
FROM (SELECT $5, $4 FROM @mystage)
FILE_FORMAT = (TYPE = 'JSON');
データで表される対応する列に一致するターゲットテーブルの列にすべてのデータをロードするパイプを作成します。列名は大文字と小文字を区別しません。
さらに、 METADATA$START_SCAN_TIME および METADATA$FILENAME メタデータ列 から、 c1 および c2 という列にメタデータをロードします。
CREATE PIPE mypipe3
AS
(COPY INTO mytable
FROM @mystage
MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
INCLUDE_METADATA = (c1= METADATA$START_SCAN_TIME, c2=METADATA$FILENAME)
FILE_FORMAT = (TYPE = 'JSON'));
メッセージングサービスから受信したイベント通知を使用して、データの自動ロード用として現在のスキーマにパイプを作成します。
Amazon S3
CREATE PIPE mypipe_s3
AUTO_INGEST = TRUE
AWS_SNS_TOPIC = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket'
AS
COPY INTO snowpipe_db.public.mytable
FROM @snowpipe_db.public.mystage
FILE_FORMAT = (TYPE = 'JSON');
Google Cloud Storage
CREATE PIPE mypipe_gcs
AUTO_INGEST = TRUE
INTEGRATION = 'MYINT'
AS
COPY INTO snowpipe_db.public.mytable
FROM @snowpipe_db.public.mystage
FILE_FORMAT = (TYPE = 'JSON');
Microsoft Azure
CREATE PIPE mypipe_azure
AUTO_INGEST = TRUE
INTEGRATION = 'MYINT'
AS
COPY INTO snowpipe_db.public.mytable
FROM @snowpipe_db.public.mystage
FILE_FORMAT = (TYPE = 'JSON');
名前付き内部ステージ
現在のスキーマで、 mystage という名前の内部ステージ上のすべてのデータファイルを自動的に読み込むパイプを作成します。
CREATE PIPE mypipe_aws AUTO_INGEST = TRUE AS COPY INTO snowpipe_db.public.mytable FROM @snowpipe_db.public.mystage FILE_FORMAT = (TYPE = 'JSON');
Snowpipe Streaming with high-performance architecture
基本的なストリーミングパイプを作成します。
CREATE OR REPLACE PIPE my_streaming_pipe
AS COPY INTO my_table
FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
SELECT 句で列ごとの式を指定することで、処理中の変換を使用してストリーミングパイプを作成します。
CREATE OR REPLACE PIPE my_pipe_with_transforms
AS COPY INTO my_table (col1, col2, col3)
FROM (
SELECT
$1:field1::STRING AS col1,
$1:field2::NUMBER AS col2,
CURRENT_TIMESTAMP() AS col3
FROM TABLE (DATA_SOURCE(TYPE => 'STREAMING'))
);
クエリのパフォーマンスを向上させるために、事前クラスタリングを有効にしてストリーミングパイプを作成します。ターゲットテーブルには、クラスタリングキーが定義されている必要があります。
CREATE OR REPLACE PIPE my_pipe_with_clustering
AS COPY INTO my_table
FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')))
CLUSTER_AT_INGEST_TIME = TRUE;