カテゴリ:

DDLデータのロード/アンロード

CREATE PIPE

インジェスチョンキューからテーブルにデータをロードするために Snowpipe が使用する COPY INTO <テーブル> ステートメントを定義するために、システムに新しいパイプを作成します。

こちらもご参照ください:

ALTER PIPEDESCRIBE PIPEDROP PIPESHOW PIPES

構文

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
  [ AUTO_INGEST = [ TRUE | FALSE ] ]
  [ AWS_SNS_TOPIC = <string> ]
  [ INTEGRATION = '<string>' ]
  [ COMMENT = '<string_literal>' ]
  AS <copy_statement>

必須パラメーター

名前

パイプの識別子。パイプが作成されるスキーマに対して一意である必要があります。

識別子はアルファベットで始まる必要があり、識別子文字列全体が二重引用符で囲まれていない限り、スペースや特殊文字を含めることはできません(例: "My object")。二重引用符で囲まれた識別子も大文字と小文字が区別されます。

詳細については、 識別子の要件 をご参照ください。

ステートメントのコピー

キューファイルからSnowflakeテーブルにデータをロードするために使用される COPY INTO <テーブル> ステートメント。このステートメントは、パイプのテキスト/定義として機能し、 SHOW PIPES 出力に表示されます。

オプションのパラメーター

AUTO_INGEST = TRUE | FALSE

構成されたメッセージサービスからイベント通知を受信したときに、指定した外部ステージおよびオプションのパスからデータファイルを自動的にロードするかどうかを指定します。

  • TRUE は、自動データロードを有効にします。

    Snowpipeは、外部ステージ(Amazon S3、Google Cloud Storage、またはMicrosoft Azure)からのロードをサポートしています。

  • FALSE は、自動データロードを無効にします。データファイルをロードするには、Snowpipe REST API エンドポイントを呼び出す必要があります。

    Snowpipeは、内部ステージ(つまり、Snowflakeの名前付きステージまたはテーブルステージであるが、ユーザーステージ ではない )、または外部ステージ(Amazon S3、Google Cloud Storage、またはMicrosoft Azure)からのロードをサポートします。

AWS_SNS_TOPIC = 文字列

Amazon Simple Notification Service(SNS)を使用してAmazon S3ステージの AUTO_INGEST を設定する場合にのみ必要です。 S3バケットの SNS トピックのAmazonリソース名(ARN)を指定します。 CREATE PIPE ステートメントは、指定された SNS トピックにAmazon Simple Queue Service(SQS)キューをサブスクライブします。パイプは、 SNS トピックを介したイベント通知によってトリガーされた取り込みキューにファイルをコピーします。詳細については、 Amazon S3用Snowpipeの自動化 をご参照ください。

INTEGRATION = '文字列'

Microsoft Azureステージの AUTO_INGEST を構成する場合のみに必要です。 Azureストレージキューへのアクセスに使用される既存の通知統合を指定します。詳細については、次をご参照ください。

統合名はすべて大文字で入力する必要があります。

COMMENT = '文字列リテラル'

パイプのコメントを指定します。

デフォルト:値なし

使用上の注意

  • 次を 除く 、すべての COPY INTO <テーブル> コピーオプションがサポートされています。

    • FILES = ( 'ファイル名1' [ , 'ファイル名2', ... ] )

    • ON_ERROR = ABORT_STATEMENT

    • SIZE_LIMIT = 数値

    • PURGE = TRUE | FALSE (つまり、ロード中の自動パージ)

    • MATCH_BY_COLUMN_NAME = CASE_SENSITIVE | CASE_INSENSITIVE | NONE

    • FORCE = TRUE | FALSE

      REMOVE コマンドを使用して、内部(つまりSnowflake)ステージからファイルを(ロード後に)手動で削除できます。

    • RETURN_FAILED_ONLY = TRUE | FALSE

    • VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS

  • PATTERN = '正規表現パターン' コピーオプションのサポートは、 プレビュー機能 として提供されます。コピーオプションは、正規表現を使用してロードするファイルのセットをフィルターします。パターンマッチングは、 AUTO_INGEST パラメーター値に応じて次のように動作します。

    • AUTO_INGEST = TRUE :正規表現は、ステージ内のファイルのリスト、および COPY INTO <テーブル> ステートメント内のオプションのパス(つまり、クラウドストレージの場所)をフィルターします。

    • :AUTO_INGEST = FALSE :正規表現は、Snowpipe REST API insertFiles エンドポイントへの呼び出しで送信されたファイルのリストをフィルターします。

  • 列の並べ替え、列の省略、およびキャスト(つまり、ロード中のデータの変換)の COPY ステートメントのソースとしてクエリを使用することはサポートされています。使用例については、 ロード中のデータの変換 をご参照ください。単純な SELECT ステートメントのみがサポートされています。 WHERE 句を使用したフィルタリングはサポートされていません。

  • パイプ定義は動的ではありません(つまり、ステージ/テーブルの名前変更やドロップなど、基になるステージまたはテーブルが変更された場合、パイプは自動的に更新されません)。代わりに、新しいパイプを作成し、今後のSnowpipe REST API 呼び出しでこのパイプ名を送信する必要があります。

重要

イベント通知を使用してデータのロードを自動化するパイプを再作成するときは、次の手順を完了することをお勧めします。

  1. パイプを一時停止します( ALTER PIPE ... SET PIPE_EXECUTION_PAUSED = true を使用)。現在キューに入っているファイルがターゲットテーブルにロードされるのを待ちます。

  2. SYSTEM$PIPE_STATUS 関数をクエリし、パイプの実行状態が PAUSED であり、保留中のファイル数が0であることを確認します。

  3. パイプを再作成します( CREATE OR REPLACE PIPE を使用)。

  4. パイプをもう一度一時停止します。

  5. クラウドメッセージングサービスの構成手順を確認して、設定が依然として正確であることを確認します。

  6. パイプを再開します( ALTER PIPE ... SET PIPE_EXECUTION_PAUSED = falseを使用)。

  7. SYSTEM$PIPE_STATUS 関数を再度クエリし、パイプの実行状態が RUNNING であることを確認します。

mystage ステージにステージングされたファイルから mytable にすべてのデータをロードするパイプを現在のスキーマに作成します。

create pipe mypipe as copy into mytable from @mystage;

前の例と同じですが、データ変換があります。ステージングされたファイルの4番目と5番目の列からのデータのみを逆順でロードします。

create pipe mypipe2 as copy into mytable(C1, C2) from (select $5, $4 from @mystage);

メッセージングサービスから受信したイベント通知を使用して、データの自動ロード用として現在のスキーマにパイプを作成します。

Amazon S3

create pipe mypipe_s3
  auto_ingest = true
  aws_sns_topic = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

Google Cloud Storage

create pipe mypipe_gcs
  auto_ingest = true
  integration = 'MYINT'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

Microsoft Azure

create pipe mypipe_azure
  auto_ingest = true
  integration = 'MYINT'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');