CREATE PIPE¶
Snowpipe で使用される:doc:/sql-reference/sql/copy-into-table`ステートメントを定義するために、システム内に新しいパイプを作成してインジェスションキューからデータをロードします。または :doc:`高性能アーキテクチャのSnowpipe Streaming </user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview> により、ストリーミングソースから直接テーブルにデータをロードします。
- こちらもご参照ください。
構文¶
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
  [ AUTO_INGEST = [ TRUE | FALSE ] ]
  [ ERROR_INTEGRATION = <integration_name> ]
  [ AWS_SNS_TOPIC = '<string>' ]
  [ INTEGRATION = '<string>' ]
  [ COMMENT = '<string_literal>' ]
  AS <copy_statement>
注釈
<copy_statement> を2つの異なるタイプのデータソースと一緒に使用することができます。
- ステージングされた場所: - COPY INTO mytable FROM @mystage ...
- ストリーミングソース: - COPY INTO mytable FROM (SELECT ... FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')))
必須パラメーター¶
- name
- パイプの識別子。パイプが作成されるスキーマに対して一意である必要があります。 - 識別子はアルファベットで始まる必要があり、識別子文字列全体が二重引用符で囲まれていない限り、スペースや特殊文字を含めることはできません(例: - "My object")。二重引用符で囲まれた識別子も大文字と小文字が区別されます。- 詳細については、 識別子の要件 をご参照ください。 
- copy_statement
- キューファイルからSnowflakeテーブルにデータをロードするために使用される COPY INTO <テーブル> ステートメント。このステートメントは、パイプのテキスト/定義として機能し、 SHOW PIPES 出力に表示されます。 
注釈
現在、Snowpipeの copy_statement で次の関数を使用することはお勧めしません。
- CURRENT_DATE 
- CURRENT_TIME 
- CURRENT_TIMESTAMP 
- GETDATE 
- LOCALTIME 
- LOCALTIMESTAMP 
- SYSDATE 
- SYSTIMESTAMP 
これらの関数を使用して挿入された時間値は、 COPY_HISTORY 関数 または COPY_HISTORY ビュー によって返される LOAD_TIME 値よりも数時間早くなる可能性があるという既知の問題があります。
代わりに METADATA$START_SCAN_TIME をクエリすることをお勧めします。これにより、記録のロードがより正確に表現されます。
オプションのパラメーター¶
- AUTO_INGEST = TRUE | FALSE
- 内部ステージと外部ステージのどちらからデータファイルを自動的に読み込むかを指定します。 - TRUEは、自動データロードを有効にします。- Snowpipeは、外部ステージ(Amazon S3、Google Cloud Storage、またはMicrosoft Azure)からのロードをサポートしています。 
- FALSEは、自動データロードを無効にします。データファイルをロードするには、Snowpipe REST API エンドポイントを呼び出す必要があります。- Snowpipeは、内部ステージ(つまり、Snowflakeの名前付きステージまたはテーブルステージであるが、ユーザーステージ ではない )、または外部ステージ(Amazon S3、Google Cloud Storage、またはMicrosoft Azure)からのロードをサポートします。 
 
- ERROR_INTEGRATION = 'integration_name'
- Snowpipeを構成して、クラウドメッセージングサービスにエラー通知を送信する場合にのみ必要です。 - メッセージングサービスとの通信に使用する通知統合の名前を指定します。詳細については、 Snowpipeのエラー通知 をご参照ください。 
- AWS_SNS_TOPIC = 'string'
- SNS を使用してAmazon S3外部ステージ用に AUTO_INGEST を構成する場合にのみ必要です。 - S3バケットの SNS トピックのAmazonリソースネーム(ARN)を指定します。CREATE PIPE ステートメントは、指定された SNS トピックにAmazon Simple Queue Service(SQS)キューをサブスクライブします。パイプは、 SNS トピックを介したイベント通知によってトリガーされた取り込みキューにファイルをコピーします。詳細については、 Amazon S3用Snowpipeの自動化 をご参照ください。 
- INTEGRATION = 'string'
- Google Cloud StorageまたはMicrosoft Azure外部ステージ用に AUTO_INGEST を構成する場合にのみ必要です。 - ストレージキューへのアクセスに使用する既存の通知統合を指定します。詳細については、次をご参照ください。 - 統合名はすべて大文字で入力する必要があります。 
- COMMENT = 'string_literal'
- パイプのコメントを指定します。 - デフォルト:値なし 
高性能アーキテクチャのSnowpipe Streaming用パイプ¶
Snowpipe Streaming用のパイプを定義して、ステージングされたファイルの場所を必要とせず、Snowpipe Streaming API から直接データをロードすることができます。このメソッドは、低レイテンシ、行ベースのインジェスションのために設計されています。
ストリーミングパイプの COPY INTO ステートメントは、FROM 句にある DATA_SOURCE のテーブル関数を TYPE=> 'STREAMING' 引数と一緒に使用する必要があります。
例:
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE
FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')))
注釈
- ストリーミング用に作成されたパイプには、 - AUTO_INGESTパラメーターまたは- FROM @stage句は必要ありません。
- ストリーミングパイプの定義内の - copy_statementは、API から受信したデータを変換し、ロードするために使用されます。
使用上の注意¶
- この SQL コマンドには、次の最小権限が必要です。 - 権限 - オブジェクト - 注意 - CREATE PIPE - スキーマ - USAGE - パイプ定義のステージ - 外部ステージのみ - USAGE - 統合 - Snowpipeのエラー通知を受信するために必要です。 - READ - パイプ定義のステージ - 内部ステージのみ - SELECT, INSERT - パイプ定義にあるテーブル - スキーマオブジェクトでの SQL 操作には、オブジェクトを含むデータベースとスキーマに対する USAGE 権限も必要です。 
- 次を 除く、すべての COPY INTO <テーブル> コピーオプションがサポートされています。 - FILES = ( 'file_name1' [ , 'file_name2', ... ] )
- ON_ERROR = ABORT_STATEMENT
- SIZE_LIMIT = num
- PURGE = TRUE | FALSE(つまり、ロード中の自動パージ)
- FORCE = TRUE | FALSE- REMOVE コマンドを使用して、内部(つまりSnowflake)ステージからファイルを(ロード後に)手動で削除できます。 
- RETURN_FAILED_ONLY = TRUE | FALSE
- VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS
 
- PATTERN = 'regex_pattern'コピーオプションは、正規表現を使用してロードするファイルのセットをフィルターします。パターンマッチングは、 AUTO_INGEST パラメーター値に応じて次のように動作します。- AUTO_INGEST = TRUE:正規表現は、ステージ内のファイルのリスト、および COPY INTO <テーブル> ステートメント内のオプションのパス(つまり、クラウドストレージの場所)をフィルターします。
- :AUTO_INGEST = FALSE: 正規表現は、Snowpipe REST API- insertFilesエンドポイントへの呼び出しで送信されたファイルのリストをフィルターします。
 - Snowpipeは、ステージ定義内のパスセグメントを保存場所からトリムし、残りのパスセグメントとファイル名に正規表現を適用することに注意してください。ステージ定義を表示するには、ステージに対して DESCRIBE STAGE コマンドを実行します。URL プロパティは、バケット名またはコンテナー名と0個以上のパスセグメントで構成されます。たとえば、 COPY INTO <テーブル> ステートメントの FROM の場所が - @s/path1/path2/で、ステージ- @sの URL の値が- s3://mybucket/path1/の場合、Snowpipeは保存場所から- /path1/をトリムします。 FROM 句で、正規表現を- path2/とパス内のファイル名に適用します。- 重要 - Snowflakeは、Snowpipe用のクラウドイベントフィルタリングを有効にして、コスト、イベントノイズ、および遅延を削減するようにお勧めします。PATTERN オプションは、クラウドプロバイダーのイベントフィルタリング機能では不十分な場合にのみ使用してください。各クラウドプロバイダーのイベントフィルタリングの構成に関する詳細については、次のページをご参照ください。 - Amazon S3: オブジェクトキー名のフィルタリングを使用したイベント通知の構成 
- Microsoft Azure Event Grid: Event Gridサブスクリプションのイベントフィルタリングについて 
- Google Cloud Pub/Sub: メッセージのフィルタリング 
 
- 列の並べ替え、列の省略、およびキャスト(つまり、ロード中のデータの変換)の COPY ステートメントのソースとしてクエリを使用することはサポートされています。使用例については、 ロード中のデータの変換 をご参照ください。単純な SELECT ステートメントのみがサポートされています。WHERE 句を使用したフィルタリングはサポートされていません。 
- パイプ定義は動的ではありません(つまり、ステージ/テーブルの名前変更やドロップなど、基になるステージまたはテーブルが変更された場合、パイプは自動的に更新されません)。代わりに、新しいパイプを作成し、今後のSnowpipe REST API 呼び出しでこのパイプ名を送信する必要があります。 
- メタデータについて。 - 注意 - Snowflakeサービスを使用する場合、お客様は、個人データ(ユーザーオブジェクト向け以外)、機密データ、輸出管理データ、またはその他の規制されたデータがメタデータとして入力されていないことを確認する必要があります。詳細については、 Snowflakeのメタデータフィールド をご参照ください。 
- OR REPLACEと- IF NOT EXISTS句は互いに排他的です。両方を同じステートメントで使うことはできません。
- CREATE OR REPLACE <オブジェクト> ステートメントはアトミックです。つまり、オブジェクトが置き換えられると、単一のトランザクションで、古いオブジェクトが削除されて新しいオブジェクトが作成されます。 
重要
パイプを再作成(CREATE OR REPLACE PIPE 構文を使用)する場合は、関連する考慮事項とベストプラクティスについて、 パイプの再作成 をご参照ください。
例¶
mystage ステージにステージングされたファイルから mytable にすべてのデータをロードするパイプを現在のスキーマに作成します。
CREATE PIPE mypipe
  AS
  COPY INTO mytable
  FROM @mystage
  FILE_FORMAT = (TYPE = 'JSON');
前の例と同じですが、データ変換があります。ステージングされたファイルの4番目と5番目の列からのデータのみを逆順でロードします。
CREATE PIPE mypipe2
  AS
  COPY INTO mytable(C1, C2)
  FROM (SELECT $5, $4 FROM @mystage)
  FILE_FORMAT = (TYPE = 'JSON');
データで表される対応する列に一致するターゲットテーブルの列にすべてのデータをロードするパイプを作成します。列名は大文字と小文字を区別しません。
さらに、 METADATA$START_SCAN_TIME および METADATA$FILENAME メタデータ列 から、 c1 および c2 という列にメタデータをロードします。
CREATE PIPE mypipe3
  AS
  (COPY INTO mytable
    FROM @mystage
    MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
    INCLUDE_METADATA = (c1= METADATA$START_SCAN_TIME, c2=METADATA$FILENAME)
    FILE_FORMAT = (TYPE = 'JSON'));
メッセージングサービスから受信したイベント通知を使用して、データの自動ロード用として現在のスキーマにパイプを作成します。
Amazon S3
CREATE PIPE mypipe_s3
  AUTO_INGEST = TRUE
  AWS_SNS_TOPIC = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket'
  AS
  COPY INTO snowpipe_db.public.mytable
  FROM @snowpipe_db.public.mystage
  FILE_FORMAT = (TYPE = 'JSON');
Google Cloud Storage
CREATE PIPE mypipe_gcs
  AUTO_INGEST = TRUE
  INTEGRATION = 'MYINT'
  AS
  COPY INTO snowpipe_db.public.mytable
  FROM @snowpipe_db.public.mystage
  FILE_FORMAT = (TYPE = 'JSON');
Microsoft Azure
CREATE PIPE mypipe_azure
  AUTO_INGEST = TRUE
  INTEGRATION = 'MYINT'
  AS
  COPY INTO snowpipe_db.public.mytable
  FROM @snowpipe_db.public.mystage
  FILE_FORMAT = (TYPE = 'JSON');
名前付き内部ステージ
現在のスキーマで、 mystage という名前の内部ステージ上のすべてのデータファイルを自動的に読み込むパイプを作成します。
CREATE PIPE mypipe_aws AUTO_INGEST = TRUE AS COPY INTO snowpipe_db.public.mytable FROM @snowpipe_db.public.mystage FILE_FORMAT = (TYPE = 'JSON');