Change Tracking Using Table Streams¶

A stream object records data manipulation language (DML) changes made to tables, including inserts, updates, and deletes, as well as metadata about each change, so that actions can be taken using the changed data. This process is referred to as change data capture (CDC). An individual table stream tracks the changes made to rows in a source table. A table stream (also referred to as simply a “stream”) makes a “change table” available of what changed, at the row level, between two transactional points of time in a table. This allows querying and consuming a sequence of change records in a transactional fashion.

In this Topic:

Overview of Table Streams¶

When created, a table stream logically takes an initial snapshot of every row in the source table by initializing a point in time (offset) as the current transactional version of the table. The change tracking system utilized by the stream then records information about the DML changes (insert, update, delete) committed after this snapshot was taken. Change records provide the state of a row before and after the change. Change information mirrors the column structure of the tracked source table and includes additional metadata columns that describe each change event.

Note that a stream itself does not contain any table data. A stream only stores the offset for the source table and returns CDC records by leveraging the versioning history for the source table. When the first stream for a table is created, a pair of hidden columns are added to the source table and begin storing change tracking metadata. These columns consume a small amount of storage. The CDC records returned when querying a stream rely on a combination of the offset stored in the stream and the change tracking metadata stored in the table.

It may be useful to think of a stream as a bookmark, which indicates a point in time in the pages of a book (i.e. the source table). A bookmark may be thrown away and other bookmarks inserted in different places in a book. So too, a stream may be dropped and other streams created at the same or different points of time (either by creating the streams consecutively at different times or by using Time Travel) to consume the change records for a table at the same or different offsets.

One example of a consumer of CDC records is a data pipeline, in which only the data in staging tables that has changed since the last extraction is transformed and copied into other tables.

Currently, streams cannot track changes in materialized views.

Table Versioning¶

A stream maintains a point of time into the transactional versioned timeline of the source table, called an offset, which starts at the transactional point when the stream contents were last consumed using a DML statement. The stream can provide the set of changes from the current offset to the current transactional time of the source table (i.e. the current version of the table). The stream maintains only the delta of the changes; if multiple DML statements change a row, the stream contains only the latest action taken on that row.

Multiple queries can independently consume the same change data from a stream without changing the offset. A stream advances the offset only when it is used in a DML transaction, including autocommit transactions (i.e. executed without explicitly starting a transaction. By default, an autocommit transaction is automatically committed on success or rolled back on failure at the end of the statement. This behavior is controlled with the AUTOCOMMIT parameter).

When a SQL statement queries a stream within an explicit transaction, the stream is queried at the stream advance point (i.e. the timestamp) when the transaction began rather than when the statement was run. This behavior pertains both to DML statements and CREATE TABLE … AS SELECT (CTAS) statements that populate a new table with rows from an existing stream.

The current offset for a stream can be determined by querying the SYSTEM$STREAM_GET_TABLE_TIMESTAMP function. A DML statement that selects from a stream consumes all of the change data in the stream as long as the transaction commits successfully. To ensure multiple statements access the same change records in the stream, surround them with an explicit transaction statement (BEGIN .. COMMIT). This locks the stream. DML updates to the source table in parallel transactions are tracked by the change tracking system but do not update the stream until the explicit transaction statement is committed and the existing change data is consumed. Repeatable Read Isolation¶ Streams support repeatable read isolation. In repeatable read mode, multiple SQL statements within a transaction see the same set of records in a stream. This differs from the read committed mode supported for tables, in which statements see any changes made by previous statements executed within the same transaction, even though those changes are not yet committed. The delta records returned by streams in a transaction is the range from the current position of the stream until the transaction start time. The stream position advances to the transaction start time if the transaction commits; otherwise it stays at the same position. Consider the following example: Time Transaction 1 Transaction 2 1 Begin transaction. 2 Query stream s1 on table t1. The stream returns the change data capture records . between the current position to the Transaction 1 start time. If the stream is used in a DML statement . the stream is then locked to avoid changes by concurrent transactions. 3 Update rows in table t1. 4 Query stream s1. Returns the same state of stream when it was used at Time 2. 5 Commit transaction. If the stream was consumed in DML statements within the transaction, the stream position advances to the transaction start time. 6 Begin transaction. 7 Query stream s1. Results include table changes committed by Transaction 1. Within Transaction 1, all queries to stream s1 see the same set of records. DML changes to table t1 are recorded to the stream only when the transaction is committed. In Transaction 2, queries to the stream see the changes recorded to the table in Transaction 1. Note that if Transaction 2 had begun before Transaction 1 was committed, queries to the stream would have returned a snapshot of the stream from the position of the stream to the beginning time of Transaction 2 and would not see any changes committed by Transaction 1. Stream Columns¶ A stream stores data in the same shape as the source table (i.e. the same column names and ordering) with the following additional columns: METADATA$ACTION

Indicates the DML operation (INSERT, DELETE) recorded.

METADATA$ISUPDATE Indicates whether the operation was part of an UPDATE statement. Updates to rows in the source table are represented as a pair of DELETE and INSERT records in the stream with a metadata column METADATA$ISUPDATE values set to TRUE.

Note that streams record the differences between two offsets. If a row is added and then updated in the current offset, the delta change is a new row. The METADATA$ISUPDATE row records a FALSE value. METADATA$ROW_ID

Specifies the unique and immutable ID for the row, which can be used to track changes to specific rows over time.

Types of Streams¶

The following stream types are available based on the metadata recorded by each:

Standard

A standard (i.e. delta) table stream tracks all DML changes to the source table, including inserts, updates, and deletes (including table truncates). This stream type performs a join on inserted and deleted rows in the change set to provide the row level delta. As a net effect, for example, a row that is inserted and then deleted between two transactional points of time in a table is removed in the delta (i.e. is not returned when the stream is queried).

Append-only

An append-only table stream tracks row inserts only. Update and delete operations (including table truncates) are not recorded. For example, if 10 rows are inserted into a table and then 5 of those rows are deleted before the offset for an append-only stream is advanced, the stream records 10 rows.

An append-only stream returns the appended rows only and therefore can be much more performant than a standard stream for extract, load, transform (ELT) and similar scenarios that depend exclusively on row inserts. For example, the source table can be truncated immediately after the rows in an append-only stream are consumed, and the record deletions do not contribute to the overhead the next time the stream is queried or consumed.

Insert-only

Supported on external tables only. An insert-only stream tracks row inserts only; they do not record delete operations that remove rows from an inserted set (i.e. no-ops). For example, in-between any two offsets, if File1 is removed from the cloud storage location referenced by the external table, and File2 is added, the stream returns records for the rows in File2 only. Unlike when tracking CDC data for standard tables, Snowflake cannot access the historical records for files in cloud storage.

Note

Support for insert-only table streams is provided as a preview feature.

Data Flow¶

The following diagram shows how the contents of a standard table stream change as rows in the source table are updated. Whenever a DML statement consumes the stream contents, the stream position advances to track the next set of DML changes to the table (i.e. the changes in a table version):

Data Retention Period and Staleness¶

A stream becomes stale when its offset is outside of the data retention period for its source table. When a stream becomes stale, the historical data for the source table is no longer accessible, including any unconsumed change records. To track new change records for the table, recreate the stream (using CREATE STREAM). To prevent a stream from becoming stale, consume the stream records within a transaction during the retention period for the table. For more information about the data retention period, see Understanding & Using Time Travel.

If the data retention period for a source table is less than 14 days, and a stream has not been consumed, Snowflake temporarily extends this period to prevent it from going stale. The period is extended to the stream’s offset, up to a maximum of 14 days, regardless of the Snowflake edition for your account. When the stream is consumed, the extended data retention period is reduced to the default period for the table.

To determine whether a stream has become stale, execute the DESCRIBE STREAM or SHOW STREAMS command. In the command output, when the STALE column value for the stream is TRUE, the stream is stale.

Note

Currently, when a database or schema that contains a source table and stream is cloned, any unconsumed records in the stream (in the clone) are inaccessible. This behavior is consistent with Time Travel for tables. If a table is cloned, historical data for the table clone begins at the time/point when the clone was created.

Multiple tasks that consume change data from a single table stream retrieve different deltas. When a task consumes the change data in a stream using a DML statement, the stream advances the offset. The change data is no longer available for the next task to consume. Currently, we recommend that only a single task consumes the change data from a stream. Multiple streams can be created for the same table and consumed by different tasks.

CHANGES Clause: Read-only Alternative to Streams¶

As an alternative to streams, Snowflake supports querying change tracking metadata for tables using the CHANGES clause for SELECT statements. The CHANGES clause enables querying change tracking metadata between two points in time without having to create a table stream with an explicit transactional offset. Using the CHANGES clause does not advance the offset (i.e. consume the records). Multiple queries can retrieve the change tracking metadata between different transactional start and endpoints. This option requires specifying a transactional start point for the metadata using an AT | BEFORE clause; the end point for the change tracking interval can be set using the optional END clause.

A stream stores the current transactional version of a table and is the appropriate source of CDC records in most scenarios. For infrequent scenarios that require managing the offset for arbitrary periods of time, the CHANGES clause is available for your use.

Note

Currently, either of the following must be true before change tracking metadata is recorded for a table:

• Change tracking is enabled on the table (using ALTER TABLE … CHANGE_TRACKING = TRUE).

• A stream is created for the table (using CREATE STREAM).

Either option adds a pair of hidden columns to the table and begins storing change tracking metadata. The columns consume a small amount of storage.

No change tracking metadata for the table is available for the period before one of these conditions is satisfied.

Streams on Shared Tables¶

Creating streams on shared tables enables data consumers to track data manipulation language (DML) changes made in those tables. This functionality is similar to creating and using streams on “local” tables (i.e. in the same account as the stream). This topic describes the steps for data providers to configure shared tables for stream creation and for consumers to create the streams.

For instructions, see .

Billing for Streams¶

As described in Data Retention Period and Staleness (in this topic), when a stream is not consumed regularly, Snowflake temporarily extends the data retention period for the source table. If the data retention period for the table is less than 14 days, then behind the scenes, the period is extended to the smaller of the stream transactional offset or 14 days (if the data retention period for the table is less than 14 days) regardless of the Snowflake edition for your account.

Extending the data retention period requires additional storage which will be reflected in your monthly storage charges.

The main cost associated with a stream is the processing time used by a virtual warehouse to query the stream. These charges appear on your bill as familiar Snowflake credits.

Stream DDL¶

To support creating and managing streams, Snowflake provides the following set of special DDL commands:

In addition, providers can view, grant, or revoke access to the necessary database objects for ELT using the following standard access control DDL:

Required Access Privileges¶

Creating and managing streams requires a role with a minimum of the following role permissions:

Object

Privilege

Notes

Database

USAGE

Schema

USAGE, CREATE STREAM

Table

SELECT

Querying a stream requires a role with a minimum of the following role permissions:

Object

Privilege

Notes

Database

USAGE

Schema

USAGE

Stream

SELECT

Table

SELECT

Examples¶

Example 1¶

The following example shows how the contents of a stream change as DML statements execute on the source table:

-- Create a table to store the names and fees paid by members of a gym
CREATE OR REPLACE TABLE members (
id number(8) NOT NULL,
name varchar(255) default NULL,
fee number(3) NULL
);

-- Create a stream to track changes to date in the MEMBERS table
CREATE OR REPLACE STREAM member_check ON TABLE members;

-- Create a table to store the dates when gym members joined
CREATE OR REPLACE TABLE signup (
id number(8),
dt DATE
);

INSERT INTO members (id,name,fee)
VALUES
(1,'Joe',0),
(2,'Jane',0),
(3,'George',0),
(4,'Betty',0),
(5,'Sally',0);

INSERT INTO signup
VALUES
(1,'2018-01-01'),
(2,'2018-02-15'),
(3,'2018-05-01'),
(4,'2018-07-16'),
(5,'2018-08-21');

-- The stream records the inserted rows
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+--------+-----+-----------------+-------------------+------------------------------------------| | 1 | Joe | 0 | INSERT | False | d200504bf3049a7d515214408d9a804fd03b46cd | | 2 | Jane | 0 | INSERT | False | d0a551cecbee0f9ad2b8a9e81bcc33b15a525a1e | | 3 | George | 0 | INSERT | False | b98ad609fffdd6f00369485a896c52ca93b92b1f | | 4 | Betty | 0 | INSERT | False | e554e6e68293a51d8e69d68e9b6be991453cc901 | | 5 | Sally | 0 | INSERT | False | c94366cf8a4270cf299b049af68a04401c13976d | +----+--------+-----+-----------------+-------------------+------------------------------------------+ -- Apply a$90 fee to members who joined the gym after a free trial period ended:
MERGE INTO members m
USING (
SELECT id, dt
FROM signup s
WHERE DATEDIFF(day, '2018-08-15'::date, s.dt::DATE) < -30) s
ON m.id = s.id
WHEN MATCHED THEN UPDATE SET m.fee = 90;

SELECT * FROM members;

+----+--------+-----+
| ID | NAME   | FEE |
|----+--------+-----|
|  1 | Joe    |  90 |
|  2 | Jane   |  90 |
|  3 | George |  90 |
|  4 | Betty  |   0 |
|  5 | Sally  |   0 |
+----+--------+-----+

-- The stream records the updated FEE column as a set of inserts
-- rather than deletes and inserts because the stream contents
-- have not been consumed yet
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+--------+-----+-----------------+-------------------+------------------------------------------| | 1 | Joe | 90 | INSERT | False | 957e84b34ef0f3d957470e02bddccb027810892c | | 2 | Jane | 90 | INSERT | False | b00168a4edb9fb399dd5cc015e5f78cbea158956 | | 3 | George | 90 | INSERT | False | 75206259362a7c89126b7cb039371a39d821f76a | | 4 | Betty | 0 | INSERT | False | 9b225bc2612d5e57b775feea01dd04a32ce2ad18 | | 5 | Sally | 0 | INSERT | False | 5a68f6296c975980fbbc569ce01033c192168eca | +----+--------+-----+-----------------+-------------------+------------------------------------------+ -- Create a table to store member details in production CREATE OR REPLACE TABLE members_prod ( id number(8) NOT NULL, name varchar(255) default NULL, fee number(3) NULL ); -- Insert the first batch of stream data into the production table INSERT INTO members_prod(id,name,fee) SELECT id, name, fee FROM member_check WHERE METADATA$ACTION = 'INSERT';

-- The stream position is advanced
select * from member_check;

+----+------+-----+-----------------+-------------------+-----------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+------+-----+-----------------+-------------------+-----------------| +----+------+-----+-----------------+-------------------+-----------------+ -- Access and lock the stream BEGIN; -- Increase the fee paid by paying members UPDATE members SET fee = fee + 15 where fee > 0; +------------------------+-------------------------------------+ | number of rows updated | number of multi-joined rows updated | |------------------------+-------------------------------------| | 3 | 0 | +------------------------+-------------------------------------+ -- These changes are not visible because the change interval of the stream object starts at the current offset and ends at the current transactional time point, which is the beginning time of the transaction SELECT * FROM member_check; +----+------+-----+-----------------+-------------------+-----------------+ | ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----+-----------------+-------------------+-----------------|
+----+------+-----+-----------------+-------------------+-----------------+

-- Commit changes
COMMIT;

-- The changes surface now because the stream object uses the current transactional time as the end point of the change interval that now includes the changes in the source table
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+--------+-----+-----------------+-------------------+------------------------------------------| | 1 | Joe | 105 | INSERT | True | 123a45b67cd0e8f012345g01abcdef012345678a | | 2 | Jane | 105 | INSERT | True | 456b45b67cd1e8f123456g01ghijkl123456779b | | 3 | George | 105 | INSERT | True | 567890c89de2f9g765438j20jklmn0234567890d | | 1 | Joe | 90 | DELETE | True | 123a45b67cd0e8f012345g01abcdef012345678a | | 2 | Jane | 90 | DELETE | True | 456b45b67cd1e8f123456g01ghijkl123456779b | | 3 | George | 90 | DELETE | True | 567890c89de2f9g765438j20jklmn0234567890d | +----+--------+-----+-----------------+-------------------+------------------------------------------+  Example 2¶ The following example shows the differences in behavior between standard (delta) and append-only streams: -- Create a source table. create or replace table t(id int, name string); -- Create a standard stream on the source table. create or replace stream delta_s on table t; -- Create an append-only stream on the source table. create or replace stream append_only_s on table t append_only=true; -- Insert 3 rows into the source table. insert into t values (0, 'charlie brown'); insert into t values (1, 'lucy'); insert into t values (2, 'linus'); -- Delete 1 of the 3 rows. delete from t where id = '0'; -- The standard stream removes the deleted row. select * from delta_s order by id; +----+-------+-----------------+-------------------+------------------------------------------+ | ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+-------+-----------------+-------------------+------------------------------------------|
|  1 | lucy  | INSERT          | False             | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 |
|  2 | linus | INSERT          | False             | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+-------+-----------------+-------------------+------------------------------------------+

-- The append-only stream does not remove the deleted row.
select * from append_only_s order by id;

+----+---------------+-----------------+-------------------+------------------------------------------+
| ID | NAME          | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+---------------+-----------------+-------------------+------------------------------------------| | 0 | charlie brown | INSERT | False | e83abf629af50ccf94d1e78c547bfd8079e68d00 | | 1 | lucy | INSERT | False | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 | | 2 | linus | INSERT | False | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 | +----+---------------+-----------------+-------------------+------------------------------------------+ -- Create a table to store the change data capture records in each of the 3 streams. create or replace table t2(id int, name string, stream_type string default NULL); -- Insert the records from the streams into the new table, advancing the offset of each stream. insert into t2(id,name,stream_type) select id, name, 'delta stream' from delta_s; insert into t2(id,name,stream_type) select id, name, 'append_only stream' from append_only_s; -- Update a row in the source table. update t set name = 'sally' where name = 'linus'; -- The standard stream records the update operation. select * from delta_s order by id; +----+-------+-----------------+-------------------+------------------------------------------+ | ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+-------+-----------------+-------------------+------------------------------------------|
|  2 | sally | INSERT          | True              | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
|  2 | linus | DELETE          | True              | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+-------+-----------------+-------------------+------------------------------------------+

-- The append-only stream does not record the update operation.
select * from append_only_s order by id;

+----+------+-----------------+-------------------+-----------------+
| ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+------+-----------------+-------------------+-----------------| +----+------+-----------------+-------------------+-----------------+  The following example shows how streams can be used in ELT (extract, load, transform) processes. In this example, new data inserted into a staging table is tracked by a stream. A set of SQL statements transform and insert the stream contents into a set of production tables: Example 3¶ -- Create a staging table that stores raw JSON data CREATE OR REPLACE TABLE data_staging ( raw variant); -- Create a stream on the staging table CREATE OR REPLACE STREAM data_check ON TABLE data_staging; -- Create 2 production tables to store transformed -- JSON data in relational columns CREATE OR REPLACE TABLE data_prod1 ( id number(8), ts TIMESTAMP ); CREATE OR REPLACE TABLE data_prod2 ( id number(8), color VARCHAR, num NUMBER ); -- Load JSON data into staging table -- using COPY statement, Snowpipe, -- or inserts SELECT * FROM data_staging; +--------------------------------------+ | RAW | |--------------------------------------| | { | | "id": 7077, | | "x1": "2018-08-14T20:57:01-07:00", | | "x2": [ | | { | | "y1": "green", | | "y2": "35" | | } | | ] | | } | | { | | "id": 7078, | | "x1": "2018-08-14T21:07:26-07:00", | | "x2": [ | | { | | "y1": "cyan", | | "y2": "107" | | } | | ] | | } | +--------------------------------------+ -- Stream table shows inserted data SELECT * FROM data_check; +--------------------------------------+-----------------+-------------------+------------------------------------------+ | RAW | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|--------------------------------------+-----------------+-------------------|------------------------------------------|
| {                                    | INSERT          | False             | 789012e01ef4j3k890123k35mnopqr567890124j |
|   "id": 7077,                        |                 |                   |                                          |
|   "x1": "2018-08-14T20:57:01-07:00", |                 |                   |                                          |
|   "x2": [                            |                 |                   |                                          |
|     {                                |                 |                   |                                          |
|       "y1": "green",                 |                 |                   |                                          |
|       "y2": "35"                     |                 |                   |                                          |
|     }                                |                 |                   |                                          |
|   ]                                  |                 |                   |                                          |
| }                                    |                 |                   |                                          |
| {                                    | INSERT          | False             | 765432u89tk3l6y456789012rst7vx678912456k |
|   "id": 7078,                        |                 |                   |                                          |
|   "x1": "2018-08-14T21:07:26-07:00", |                 |                   |                                          |
|   "x2": [                            |                 |                   |                                          |
|     {                                |                 |                   |                                          |
|       "y1": "cyan",                  |                 |                   |                                          |
|       "y2": "107"                    |                 |                   |                                          |
|     }                                |                 |                   |                                          |
|   ]                                  |                 |                   |                                          |
| }                                    |                 |                   |                                          |
+--------------------------------------+-----------------+-------------------+------------------------------------------+

-- Access and lock the stream
BEGIN;

-- Transform and copy JSON elements into relational columns
-- in the production tables
INSERT INTO data_prod1 (id, ts)
SELECT t.raw:id, to_timestamp (t.raw:x1)
FROM data_check t
WHERE METADATA$ACTION = 'INSERT'; INSERT INTO data_prod2 (id, color, num) SELECT t.raw:id, f.value:y1, f.value:y2 FROM data_check t , lateral flatten(input => raw:x2) f WHERE METADATA$ACTION = 'INSERT';

-- Commit changes in the stream objects participating in the transaction
COMMIT;

SELECT * FROM data_prod1;

+------+-------------------------+
|   ID | TS                      |
|------+-------------------------|
| 7077 | 2018-08-14 20:57:01.000 |
| 7078 | 2018-08-14 21:07:26.000 |
+------+-------------------------+

SELECT * FROM data_prod2;

+------+-------+-----+
|   ID | COLOR | NUM |
|------+-------+-----|
| 7077 | green |  35 |
| 7078 | cyan  | 107 |
+------+-------+-----+

SELECT * FROM data_check;

+-----+-----------------+-------------------+
| RAW | METADATA$ACTION | METADATA$ISUPDATE |
|-----+-----------------+-------------------|
+-----+-----------------+-------------------+