Categories:

Data Loading / Unloading DDL

CREATE PIPE

Creates a new pipe in the system for defining the COPY INTO <table> statement used by Snowpipe to load data from an ingestion queue into tables.

See also:

ALTER PIPE , DESCRIBE PIPE , DROP PIPE , SHOW PIPES

Syntax

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
  [ AUTO_INGEST = [ TRUE | FALSE ] ]
  [ AWS_SNS_TOPIC = <string> ]
  [ INTEGRATION = '<string>' ]
  [ COMMENT = '<string_literal>' ]
  AS <copy_statement>

Required Parameters

name

Identifier for the pipe; must be unique for the schema in which the pipe is created.

The identifier must start with an alphabetic character and cannot contain spaces or special characters unless the entire identifier string is enclosed in double quotes (e.g. "My object"). Identifiers enclosed in double quotes are also case-sensitive.

For more details, see Identifier Requirements.

copy_statement

COPY INTO <table> statement used to load data from queued files into a Snowflake table. This statement serves as the text/definition for the pipe and is displayed in the SHOW PIPES output.

Optional Parameters

AUTO_INGEST = TRUE | FALSE

Specifies whether to automatically load data files from the specified external stage and optional path when event notifications are received from a configured message service.

  • TRUE enables automatic data loading.

    Snowpipe supports loading from external stages (Amazon S3, Google Cloud Storage, or Microsoft Azure).

  • FALSE disables automatic data loading. You must make calls to the Snowpipe REST API endpoints to load data files.

    Snowpipe supports loading from internal stages (i.e. Snowflake named stages or table stages, but not user stages) or external stage (Amazon S3, Google Cloud Storage, or Microsoft Azure).

AWS_SNS_TOPIC = string

Required only when configuring AUTO_INGEST for Amazon S3 stages using Amazon Simple Notification Service (SNS). Specifies the Amazon Resource Name (ARN) for the SNS topic for your S3 bucket. The CREATE PIPE statement subscribes the Amazon Simple Queue Service (SQS) queue to the specified SNS topic. The pipe copies files to the ingest queue triggered by event notifications via the SNS topic. For more information, see Automating Snowpipe for Amazon S3.

INTEGRATION = 'string'

Required only when configuring AUTO_INGEST for Google Cloud Storage or Microsoft Azure stages. Specifies the existing notification integration used to access the storage queue. For more information, see:

The integration name must be typed in all uppercase.

COMMENT = 'string_literal'

Specifies a comment for the pipe.

Default: No value

Usage Notes

  • All COPY INTO <table> copy options are supported except for the following:

    • FILES = ( 'file_name1' [ , 'file_name2', ... ] )

    • ON_ERROR = ABORT_STATEMENT

    • SIZE_LIMIT = num

    • PURGE = TRUE | FALSE (i.e. automatic purging while loading)

    • MATCH_BY_COLUMN_NAME = CASE_SENSITIVE | CASE_INSENSITIVE | NONE

    • FORCE = TRUE | FALSE

      Note that you can manually remove files from an internal (i.e. Snowflake) stage (after they’ve been loaded) using the REMOVE command.

    • RETURN_FAILED_ONLY = TRUE | FALSE

    • VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS

  • Support for the PATTERN = 'regex_pattern' copy option is provided as a preview feature. The copy option filters the set of files to load using a regular expression. Pattern matching behaves as follows depending on the AUTO_INGEST parameter value:

    • AUTO_INGEST = TRUE: The regular expression filters the list of files in the stage and optional path (i.e. cloud storage location) in the COPY INTO <table> statement.

    • :AUTO_INGEST = FALSE: The regular expression filters the list of files submitted in calls to the Snowpipe REST API insertFiles endpoint.

  • Using a query as the source for the COPY statement for column reordering, column omission, and casts (i.e. transforming data during a load) is supported. For usage examples, see Transforming Data During a Load. Note that only simple SELECT statements are supported. Filtering using a WHERE clause is not supported.

  • Pipe definitions are not dynamic (i.e. a pipe is not automatically updated if the underlying stage or table changes, such as renaming or dropping the stage/table). Instead, you must create a new pipe and submit this pipe name in future Snowpipe REST API calls.

Important

When recreating a pipe that automates data loads using event notifications, we recommend that you complete the following steps:

  1. Pause the pipe (using ALTER PIPE … SET PIPE_EXECUTION_PAUSED = true). Wait for any files currently queued to be loaded into the target table.

  2. Query the SYSTEM$PIPE_STATUS function and verify that the pipe execution state is PAUSED and the pending file count is 0.

  3. Recreate the pipe (using CREATE OR REPLACE PIPE).

  4. Pause the pipe again.

  5. Review the configuration steps for your cloud messaging service to ensure the settings are still accurate:

  6. Resume the pipe (using ALTER PIPE … SET PIPE_EXECUTION_PAUSED = false).

  7. Query the SYSTEM$PIPE_STATUS function again and verify that the pipe execution state is RUNNING.

Examples

Create a pipe in the current schema that loads all the data from files staged in the mystage stage into mytable:

create pipe mypipe as copy into mytable from @mystage;

Same as the previous example, but with a data transformation. Only load data from the 4th and 5th columns in the staged files, in reverse order:

create pipe mypipe2 as copy into mytable(C1, C2) from (select $5, $4 from @mystage);

Create a pipe in the current schema for automatic loading of data using event notifications received from a messaging service:

Amazon S3

create pipe mypipe_s3
  auto_ingest = true
  aws_sns_topic = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

Google Cloud Storage

create pipe mypipe_gcs
  auto_ingest = true
  integration = 'MYINT'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

Microsoft Azure

create pipe mypipe_azure
  auto_ingest = true
  integration = 'MYINT'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');