CREATE PIPE¶
Creates a new pipe in the system for defining the COPY INTO <table> statement used by Snowpipe to load data from an ingestion queue into tables.
- See also:
Syntax¶
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
[ AUTO_INGEST = [ TRUE | FALSE ] ]
[ ERROR_INTEGRATION = <integration_name> ]
[ AWS_SNS_TOPIC = '<string>' ]
[ INTEGRATION = '<string>' ]
[ COMMENT = '<string_literal>' ]
AS <copy_statement>
Required parameters¶
name
Identifier for the pipe; must be unique for the schema in which the pipe is created.
The identifier must start with an alphabetic character and cannot contain spaces or special characters unless the entire identifier string is enclosed in double quotes (e.g.
"My object"
). Identifiers enclosed in double quotes are also case-sensitive.For more details, see Identifier requirements.
copy_statement
COPY INTO <table> statement used to load data from queued files into a Snowflake table. This statement serves as the text/definition for the pipe and is displayed in the SHOW PIPES output.
Note
We currently do not recommend using the following functions in the copy_statement
for Snowpipe:
CURRENT_DATE
CURRENT_TIME
CURRENT_TIMESTAMP
GETDATE
LOCALTIME
LOCALTIMESTAMP
SYSDATE
SYSTIMESTAMP
It is a known issue that the time values inserted using these functions can be a few hours earlier than the LOAD_TIME values returned by the COPY_HISTORY function or the COPY_HISTORY view.
It is recommended to query METADATA$START_SCAN_TIME instead, which provides a more accurate representation of record loading.
Optional parameters¶
AUTO_INGEST = TRUE | FALSE
Specifies whether to automatically load data files from the specified external stage and optional path when event notifications are received from a configured message service:
TRUE
enables automatic data loading.Snowpipe supports loading from external stages (Amazon S3, Google Cloud Storage, or Microsoft Azure).
FALSE
disables automatic data loading. You must make calls to the Snowpipe REST API endpoints to load data files.Snowpipe supports loading from internal stages (i.e. Snowflake named stages or table stages, but not user stages) or external stage (Amazon S3, Google Cloud Storage, or Microsoft Azure).
ERROR_INTEGRATION = 'integration_name'
Required only when configuring Snowpipe to send error notifications to a cloud messaging service.
Specifies the name of the notification integration used to communicate with the messaging service. For more information, see Snowpipe error notifications.
AWS_SNS_TOPIC = 'string'
Required only when configuring AUTO_INGEST for Amazon S3 stages using SNS.
Specifies the Amazon Resource Name (ARN) for the SNS topic for your S3 bucket. The CREATE PIPE statement subscribes the Amazon Simple Queue Service (SQS) queue to the specified SNS topic. The pipe copies files to the ingest queue triggered by event notifications via the SNS topic. For more information, see Automating Snowpipe for Amazon S3.
INTEGRATION = 'string'
Required only when configuring AUTO_INGEST for Google Cloud Storage or Microsoft Azure stages.
Specifies the existing notification integration used to access the storage queue. For more information, see:
The integration name must be typed in all uppercase.
COMMENT = 'string_literal'
Specifies a comment for the pipe.
Default: No value
Usage notes¶
This SQL command requires the following minimum permissions:
Privilege
Object
Notes
CREATE PIPE
Schema
USAGE
Stage in the pipe definition
External stages only
READ
Stage in the pipe definition
Internal stages only
SELECT, INSERT
Table in the pipe definition
SQL operations on schema objects also require the USAGE privilege on the database and schema that contain the object.
All COPY INTO <table> copy options are supported except for the following:
FILES = ( 'file_name1' [ , 'file_name2', ... ] )
ON_ERROR = ABORT_STATEMENT
SIZE_LIMIT = num
PURGE = TRUE | FALSE
(i.e. automatic purging while loading)FORCE = TRUE | FALSE
Note that you can manually remove files from an internal (i.e. Snowflake) stage (after they’ve been loaded) using the REMOVE command.
RETURN_FAILED_ONLY = TRUE | FALSE
VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS
The
PATTERN = 'regex_pattern'
copy option filters the set of files to load using a regular expression. Pattern matching behaves as follows depending on the AUTO_INGEST parameter value:AUTO_INGEST = TRUE
: The regular expression filters the list of files in the stage and optional path (i.e. cloud storage location) in the COPY INTO <table> statement.:AUTO_INGEST = FALSE
: The regular expression filters the list of files submitted in calls to the Snowpipe REST APIinsertFiles
endpoint.
Note that Snowpipe trims any path segments in the stage definition from the storage location and applies the regular expression to any remaining path segments and filenames. To view the stage definition, execute the DESCRIBE STAGE command for the stage. The URL property consists of the bucket or container name and zero or more path segments. For example, if the FROM location in a COPY INTO <table> statement is
@s/path1/path2/
and the URL value for stage@s
iss3://mybucket/path1/
, then Snowpipe trims/path1/
from the storage location in the FROM clause and applies the regular expression topath2/
plus the filenames in the path.Important
Snowflake recommends that you enable cloud event filtering for Snowpipe to reduce costs, event noise, and latency. Only use the PATTERN option when your cloud provider’s event filtering feature is not sufficient. For more information about configuring event filtering for each cloud provider, see the following pages:
Amazon S3: Configuring event notifications using object key name filtering
Microsoft Azure Event Grid: Understand event filtering for Event Grid subscriptions
Google Cloud Pub/Sub: Filtering messages
Using a query as the source for the COPY statement for column reordering, column omission, and casts (i.e. transforming data during a load) is supported. For usage examples, see Transforming data during a load. Note that only simple SELECT statements are supported. Filtering using a WHERE clause is not supported.
Pipe definitions are not dynamic (i.e. a pipe is not automatically updated if the underlying stage or table changes, such as renaming or dropping the stage/table). Instead, you must create a new pipe and submit this pipe name in future Snowpipe REST API calls.
Regarding metadata:
Attention
Customers should ensure that no personal data (other than for a User object), sensitive data, export-controlled data, or other regulated data is entered as metadata when using the Snowflake service. For more information, see Metadata fields in Snowflake.
CREATE OR REPLACE <object> statements are atomic. That is, when an object is replaced, the old object is deleted and the new object is created in a single transaction.
Important
If you recreate a pipe (using the CREATE OR REPLACE PIPE syntax), see Recreating pipes for related considerations and best practices.
Examples¶
Create a pipe in the current schema that loads all the data from files staged in the mystage
stage into mytable
:
CREATE PIPE mypipe AS COPY INTO mytable FROM @mystage FILE_FORMAT = (TYPE = 'JSON');
Same as the previous example, but with a data transformation. Only load data from the 4th and 5th columns in the staged files, in reverse order:
CREATE PIPE mypipe2 AS COPY INTO mytable(C1, C2) FROM (SELECT $5, $4 FROM @mystage) FILE_FORMAT = (TYPE = 'JSON');
Create a pipe that loads all the data into columns in the target table that match corresponding columns represented in the data. Column names are case-insensitive.
In addition, load metadata from the METADATA$START_SCAN_TIME and METADATA$FILENAME metadata columns to the columns named c1
and c2
.
CREATE PIPE mypipe3 AS (COPY INTO mytable FROM @mystage MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE INCLUDE_METADATA = (c1= METADATA$START_SCAN_TIME, c2=METADATA$FILENAME) FILE_FORMAT = (TYPE = 'JSON'));
Create a pipe in the current schema for automatic loading of data using event notifications received from a messaging service:
Amazon S3
CREATE PIPE mypipe_s3 AUTO_INGEST = TRUE AWS_SNS_TOPIC = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket' AS COPY INTO snowpipe_db.public.mytable FROM @snowpipe_db.public.mystage FILE_FORMAT = (TYPE = 'JSON');Google Cloud Storage
CREATE PIPE mypipe_gcs AUTO_INGEST = TRUE INTEGRATION = 'MYINT' AS COPY INTO snowpipe_db.public.mytable FROM @snowpipe_db.public.mystage FILE_FORMAT = (TYPE = 'JSON');Microsoft Azure
CREATE PIPE mypipe_azure AUTO_INGEST = TRUE INTEGRATION = 'MYINT' AS COPY INTO snowpipe_db.public.mytable FROM @snowpipe_db.public.mystage FILE_FORMAT = (TYPE = 'JSON');