Categories:

Data Pipeline DDL

CREATE STREAM

Creates a new stream in the current/specified schema or replaces an existing stream. A stream records data manipulation language (DML) changes made to a table, including information about inserts, updates, and deletes. The table for which changes are recorded is called the source table.

In addition, this command supports the following variant:

  • CREATE STREAM … CLONE (creates a clone of an existing stream)

See also:

ALTER STREAM , DROP STREAM , SHOW STREAMS

In this Topic:

Syntax

-- Table stream
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
  <name>
  [ COPY GRANTS ]
  ON TABLE <table_name>
  [ { AT | BEFORE } { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> } ]
  [ APPEND_ONLY = TRUE | FALSE ]
  [ COMMENT = '<string_literal>' ]

-- External table stream
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
  <name>
  [ COPY GRANTS ]
  ON EXTERNAL TABLE <external_table_name>
  [ { AT | BEFORE } { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> } ]
  [ INSERT_ONLY = TRUE ]
  [ COMMENT = '<string_literal>' ]

Variant Syntax

CREATE STREAM … CLONE

Creates a new stream with the same definition as the source stream. The clone inherits the current offset (i.e. the current transactional version of the table) from the source stream.

CREATE [ OR REPLACE ] STREAM <name> CLONE <source_stream>
  [ COPY GRANTS ]
  [ ... ]

For more information about cloning, see CREATE <object> … CLONE.

Required Parameters

name

String that specifies the identifier (i.e. name) for the stream; must be unique for the schema in which the stream is created.

In addition, the identifier must start with an alphabetic character and cannot contain spaces or special characters unless the entire identifier string is enclosed in double quotes (e.g. "My object"). Identifiers enclosed in double quotes are also case-sensitive.

For more details, see Identifier Requirements.

table_name

String that specifies the identifier (i.e. name) for the table whose changes are tracked by the stream (i.e. the source table).

Access control

To query a stream, a role must have the SELECT privilege on the underlying table.

external_table_name

String that specifies the identifier (i.e. name) for the external table whose changes are tracked by the stream (i.e. the source external table).

Access control

To query a stream, a role must have the SELECT privilege on the underlying external table.

Optional Parameters

COPY GRANTS

Specifies to retain the access permissions from the original stream when a new stream is created using any of the following CREATE STREAM variants:

  • CREATE OR REPLACE STREAM

  • CREATE STREAM … CLONE

The parameter copies all permissions, except OWNERSHIP, from the existing stream to the new stream. By default, the role that executes the CREATE STREAM command owns the new stream.

Note

  • If the CREATE STREAM statement references more than one stream (e.g. create or replace stream t1 clone t2;), the COPY GRANTS clause gives precedence to the stream being replaced.

  • The SHOW GRANTS output for the replacement stream lists the grantee for the copied privileges as the role that executed the CREATE STREAM statement, with the current timestamp when the statement was executed.

  • The operation to copy grants occurs atomically in the CREATE STREAM command (i.e. within the same transaction).

Note

This parameter is not supported currently.

AT | BEFORE TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id>

Creates a stream on a table at a specific time/point in the past (using Time Travel). The AT | BEFORE clause determines the point in the past from which historical data is requested for the table:

  • The AT keyword specifies that the request is inclusive of any changes made by a statement or transaction with a timestamp equal to the specified parameter.

  • The BEFORE keyword specifies that the request refers to a point immediately preceding the specified parameter.

Note

Currently, a stream must be created on a table before change tracking information is recorded for the table. If no stream existed on the table at the point in the past specified in the AT | BEFORE clause, the CREATE STREAM statement fails. No stream can be created at a time in the past before change tracking was recorded.

APPEND_ONLY = TRUE | FALSE

Specifies whether this is an append-only stream. Append-only streams track row inserts only. Update and delete operations (including table truncates) are not recorded. For example, if 10 rows are inserted into a table and then 5 of those rows are deleted before the offset for an append-only stream is advanced, the stream records 10 rows.

This type of stream improves query performance over standard streams and is very useful for extract, load, transform (ELT) and similar scenarios that depend exclusively on row inserts.

A standard stream joins the deleted and inserted rows in the change set to determine which rows were deleted and which were updated. An append-only stream returns the appended rows only and therefore can be much more performant than a standard stream. For example, the source table can be truncated immediately after the rows in an append-only stream are consumed, and the record deletions do not contribute to the overhead the next time the stream is queried or consumed.

Default

FALSE

INSERT_ONLY = TRUE | FALSE

Specifies whether this is an insert-only stream. Insert-only streams track row inserts only; they do not record delete operations that remove rows from an inserted set (i.e. no-ops). For example, in-between any two offsets, if File1 is removed from the cloud storage location referenced by the external table, and File2 is added, the stream returns records for the rows in File2 only. Unlike when tracking CDC data for standard tables, Snowflake cannot access the historical records for files in cloud storage.

Note

Support for insert-only table streams is provided as a preview feature.

Default

FALSE

COMMENT = 'string_literal'

String (literal) that specifies a comment for the table.

Default: No value

Output

The output for a stream includes the same columns as the source table along with the following additional columns:

  • METADATA$ACTION: Specifies the action (INSERT or DELETE).

  • METADATA$ISUPDATE: Specifies whether the action recorded (INSERT or DELETE) is part of an UPDATE applied to the rows in the source table.

    Note that streams record the differences between two offsets. If a row is added and then updated in the current offset, the delta change is a new row. The METADATA$ISUPDATE row records a FALSE value.

  • METADATA$ROW_ID: Specifies the unique and immutable ID for the row, which can be used to track changes to specific rows over time.

Usage Notes

  • Creating a stream requires a role that has been explicitly granted the following privileges, along with the USAGE privileges on the database and schema:

    • Schema: CREATE STREAM

    • Source table: SELECT

  • A stream can be queried multiple times to update multiple objects in the same transaction and it will return the same data.

  • The stream position (i.e. offset) is advanced when the stream is used in a DML statement. The position is updated at the end of the transaction to the beginning timestamp of the transaction. The stream describes change records starting from the current position of the stream and ending at the current transactional timestamp.

    To ensure multiple statements access the same change records in the stream, surround them with an explicit transaction statement (BEGIN .. COMMIT). An explicit transaction locks the stream, so that DML updates to the source table are not reported to the stream until the transaction is committed.

  • Streams have no Fail-safe period or Time Travel retention period. The metadata in these objects cannot be recovered if a stream is dropped.

  • When the first stream for a table is created, a pair of hidden columns are added to the table and begin storing change tracking metadata. The columns consume a small amount of storage.

Examples

Creating a Table Stream

Create a stream on the mytable table:

CREATE STREAM mystream ON TABLE mytable;

Using Time Travel with the Source Table

Create a stream on the mytable table as it existed before the date and time in the specified timestamp:

CREATE STREAM mystream ON TABLE mytable BEFORE (TIMESTAMP => TO_TIMESTAMP(40*365*86400));

Create a stream on the mytable table as it existed exactly at the date and time of the specified timestamp:

CREATE STREAM mystream ON TABLE mytable AT (TIMESTAMP => TO_TIMESTAMP_TZ('02/02/2019 01:02:03', 'mm/dd/yyyy hh24:mi:ss'));

Create a stream on the mytable table as it existed 5 minutes ago:

CREATE STREAM mystream ON TABLE mytable AT(OFFSET => -60*5);

Create a stream on the mytable table including transactions up to, but not including any changes made by the specified transaction:

CREATE STREAM mystream ON TABLE mytable BEFORE(STATEMENT => '8e5d0ca9-005e-44e6-b858-a8f5b37c5726');

Creating an Insert-only Stream on an External Table

Create an external table stream and query the change data capture records in the stream, which track the records added to the external table metadata:

-- Create an external table that points to the MY_EXT_STAGE stage.
-- The external table is partitioned by the date (in YYYY/MM/DD format) in the file path.
CREATE EXTERNAL TABLE my_ext_table (
  date_part date as to_date(substr(metadata$filename, 1, 10), 'YYYY/MM/DD'),
  ts timestamp AS (value:time::timestamp),
  user_id varchar AS (value:userId::varchar),
  color varchar AS (value:color::varchar)
) PARTITION BY (date_part)
  LOCATION=@my_ext_stage
  AUTO_REFRESH = false
  FILE_FORMAT=(TYPE=JSON);

-- Create a stream on the external table
CREATE STREAM my_ext_table_stream ON EXTERNAL TABLE my_ext_table INSERT_ONLY = TRUE;

-- Execute SHOW streams
-- The MODE column indicates that the new stream is an INSERT_ONLY stream
SHOW STREAMS;
+-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------+
| created_on                    | name                   | database_name | schema_name | owner        | comment   | table_name                         | type  | stale | mode        |
|-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------|
| 2020-08-02 05:13:20.174 -0800 | MY_EXT_TABLE_STREAM    | MYDB          | PUBLIC      | MYROLE       |           | MYDB.PUBLIC.EXTTABLE_S3_PART       | DELTA | false | INSERT_ONLY |
+-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------+

-- Add a file named '2020/08/05/1408/log-08051409.json' to the stage using the appropriate tool for the cloud storage service.

-- Manually refresh the external table metadata.
ALTER EXTERNAL TABLE my_ext_table REFRESH;

-- Query the external table stream.
-- The stream indicates that the rows in the added JSON file were recorded in the external table metadata.
SELECT * FROM my_ext_table_stream;
+----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------+
| VALUE                                  | DATE_PART  | TS                      | USER_ID | COLOR | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | METADATA$FILENAME                           |
|----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------|
| {                                      | 2020-08-05 | 2020-08-05 15:57:01.000 | user25  | green | INSERT          | False             |                 | test/logs/2020/08/05/1408/log-08051409.json |
|   "color": "green",                    |            |                         |         |       |                 |                   |                 |                                             |
|   "time": "2020-08-05 15:57:01-07:00", |            |                         |         |       |                 |                   |                 |                                             |
|   "userId": "user25"                   |            |                         |         |       |                 |                   |                 |                                             |
| }                                      |            |                         |         |       |                 |                   |                 |                                             |
| {                                      | 2020-08-05 | 2020-08-05 15:58:02.000 | user56  | brown | INSERT          | False             |                 | test/logs/2020/08/05/1408/log-08051409.json |
|   "color": "brown",                    |            |                         |         |       |                 |                   |                 |                                             |
|   "time": "2020-08-05 15:58:02-07:00", |            |                         |         |       |                 |                   |                 |                                             |
|   "userId": "user56"                   |            |                         |         |       |                 |                   |                 |                                             |
| }                                      |            |                         |         |       |                 |                   |                 |                                             |
+----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------+