Exemplos de fluxos¶
Este tópico fornece exemplos práticos de casos de uso para fluxos em objetos.
Neste tópico:
Fluxos em tabelas¶
Exemplo básico¶
O exemplo a seguir mostra como o conteúdo de um fluxo muda conforme instruções DML são executadas na tabela de origem:
-- Create a table to store the names and fees paid by members of a gym
CREATE OR REPLACE TABLE members (
id number(8) NOT NULL,
name varchar(255) default NULL,
fee number(3) NULL
);
-- Create a stream to track changes to date in the MEMBERS table
CREATE OR REPLACE STREAM member_check ON TABLE members;
-- Create a table to store the dates when gym members joined
CREATE OR REPLACE TABLE signup (
id number(8),
dt DATE
);
INSERT INTO members (id,name,fee)
VALUES
(1,'Joe',0),
(2,'Jane',0),
(3,'George',0),
(4,'Betty',0),
(5,'Sally',0);
INSERT INTO signup
VALUES
(1,'2018-01-01'),
(2,'2018-02-15'),
(3,'2018-05-01'),
(4,'2018-07-16'),
(5,'2018-08-21');
-- The stream records the inserted rows
SELECT * FROM member_check;
+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
| 1 | Joe | 0 | INSERT | False | d200504bf3049a7d515214408d9a804fd03b46cd |
| 2 | Jane | 0 | INSERT | False | d0a551cecbee0f9ad2b8a9e81bcc33b15a525a1e |
| 3 | George | 0 | INSERT | False | b98ad609fffdd6f00369485a896c52ca93b92b1f |
| 4 | Betty | 0 | INSERT | False | e554e6e68293a51d8e69d68e9b6be991453cc901 |
| 5 | Sally | 0 | INSERT | False | c94366cf8a4270cf299b049af68a04401c13976d |
+----+--------+-----+-----------------+-------------------+------------------------------------------+
-- Apply a $90 fee to members who joined the gym after a free trial period ended:
MERGE INTO members m
USING (
SELECT id, dt
FROM signup s
WHERE DATEDIFF(day, '2018-08-15'::date, s.dt::DATE) < -30) s
ON m.id = s.id
WHEN MATCHED THEN UPDATE SET m.fee = 90;
SELECT * FROM members;
+----+--------+-----+
| ID | NAME | FEE |
|----+--------+-----|
| 1 | Joe | 90 |
| 2 | Jane | 90 |
| 3 | George | 90 |
| 4 | Betty | 0 |
| 5 | Sally | 0 |
+----+--------+-----+
-- The stream records the updated FEE column as a set of inserts
-- rather than deletes and inserts because the stream contents
-- have not been consumed yet
SELECT * FROM member_check;
+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
| 1 | Joe | 90 | INSERT | False | 957e84b34ef0f3d957470e02bddccb027810892c |
| 2 | Jane | 90 | INSERT | False | b00168a4edb9fb399dd5cc015e5f78cbea158956 |
| 3 | George | 90 | INSERT | False | 75206259362a7c89126b7cb039371a39d821f76a |
| 4 | Betty | 0 | INSERT | False | 9b225bc2612d5e57b775feea01dd04a32ce2ad18 |
| 5 | Sally | 0 | INSERT | False | 5a68f6296c975980fbbc569ce01033c192168eca |
+----+--------+-----+-----------------+-------------------+------------------------------------------+
-- Create a table to store member details in production
CREATE OR REPLACE TABLE members_prod (
id number(8) NOT NULL,
name varchar(255) default NULL,
fee number(3) NULL
);
-- Insert the first batch of stream data into the production table
INSERT INTO members_prod(id,name,fee) SELECT id, name, fee FROM member_check WHERE METADATA$ACTION = 'INSERT';
-- The stream position is advanced
select * from member_check;
+----+------+-----+-----------------+-------------------+-----------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----+-----------------+-------------------+-----------------|
+----+------+-----+-----------------+-------------------+-----------------+
-- Access and lock the stream
BEGIN;
-- Increase the fee paid by paying members
UPDATE members SET fee = fee + 15 where fee > 0;
+------------------------+-------------------------------------+
| number of rows updated | number of multi-joined rows updated |
|------------------------+-------------------------------------|
| 3 | 0 |
+------------------------+-------------------------------------+
-- These changes are not visible because the change interval of the stream object starts at the current offset and ends at the current
-- transactional time point, which is the beginning time of the transaction
SELECT * FROM member_check;
+----+------+-----+-----------------+-------------------+-----------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----+-----------------+-------------------+-----------------|
+----+------+-----+-----------------+-------------------+-----------------+
-- Commit changes
COMMIT;
-- The changes surface now because the stream object uses the current transactional time as the end point of the change interval that now
-- includes the changes in the source table
SELECT * FROM member_check;
+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
| 1 | Joe | 105 | INSERT | True | 123a45b67cd0e8f012345g01abcdef012345678a |
| 2 | Jane | 105 | INSERT | True | 456b45b67cd1e8f123456g01ghijkl123456779b |
| 3 | George | 105 | INSERT | True | 567890c89de2f9g765438j20jklmn0234567890d |
| 1 | Joe | 90 | DELETE | True | 123a45b67cd0e8f012345g01abcdef012345678a |
| 2 | Jane | 90 | DELETE | True | 456b45b67cd1e8f123456g01ghijkl123456779b |
| 3 | George | 90 | DELETE | True | 567890c89de2f9g765438j20jklmn0234567890d |
+----+--------+-----+-----------------+-------------------+------------------------------------------+
Diferenças entre os fluxos padrão e apenas para anexação¶
O exemplo a seguir mostra as diferenças de comportamento entre fluxos padrão (delta) e apenas para anexação:
-- Create a source table.
create or replace table t(id int, name string);
-- Create a standard stream on the source table.
create or replace stream delta_s on table t;
-- Create an append-only stream on the source table.
create or replace stream append_only_s on table t append_only=true;
-- Insert 3 rows into the source table.
insert into t values (0, 'charlie brown');
insert into t values (1, 'lucy');
insert into t values (2, 'linus');
-- Delete 1 of the 3 rows.
delete from t where id = '0';
-- The standard stream removes the deleted row.
select * from delta_s order by id;
+----+-------+-----------------+-------------------+------------------------------------------+
| ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+-------+-----------------+-------------------+------------------------------------------|
| 1 | lucy | INSERT | False | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 |
| 2 | linus | INSERT | False | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+-------+-----------------+-------------------+------------------------------------------+
-- The append-only stream does not remove the deleted row.
select * from append_only_s order by id;
+----+---------------+-----------------+-------------------+------------------------------------------+
| ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+---------------+-----------------+-------------------+------------------------------------------|
| 0 | charlie brown | INSERT | False | e83abf629af50ccf94d1e78c547bfd8079e68d00 |
| 1 | lucy | INSERT | False | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 |
| 2 | linus | INSERT | False | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+---------------+-----------------+-------------------+------------------------------------------+
-- Create a table to store the change data capture records in each of the streams.
create or replace table t2(id int, name string, stream_type string default NULL);
-- Insert the records from the streams into the new table, advancing the offset of each stream.
insert into t2(id,name,stream_type) select id, name, 'delta stream' from delta_s;
insert into t2(id,name,stream_type) select id, name, 'append_only stream' from append_only_s;
-- Update a row in the source table.
update t set name = 'sally' where name = 'linus';
-- The standard stream records the update operation.
select * from delta_s order by id;
+----+-------+-----------------+-------------------+------------------------------------------+
| ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+-------+-----------------+-------------------+------------------------------------------|
| 2 | sally | INSERT | True | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
| 2 | linus | DELETE | True | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+-------+-----------------+-------------------+------------------------------------------+
-- The append-only stream does not record the update operation.
select * from append_only_s order by id;
+----+------+-----------------+-------------------+-----------------+
| ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----------------+-------------------+-----------------|
+----+------+-----------------+-------------------+-----------------+
O exemplo seguinte mostra como os fluxos podem ser utilizados em processos ELT (extrair, carregar, transformar). Neste exemplo, os novos dados inseridos em uma tabela de preparação são rastreados por um fluxo. Um conjunto de instruções SQL transforma e insere o conteúdo do fluxo em um conjunto de tabelas de produção:
Operações DML em transações explícitas¶
-- Create a staging table that stores raw JSON data
CREATE OR REPLACE TABLE data_staging (
raw variant);
-- Create a stream on the staging table
CREATE OR REPLACE STREAM data_check ON TABLE data_staging;
-- Create 2 production tables to store transformed
-- JSON data in relational columns
CREATE OR REPLACE TABLE data_prod1 (
id number(8),
ts TIMESTAMP_TZ
);
CREATE OR REPLACE TABLE data_prod2 (
id number(8),
color VARCHAR,
num NUMBER
);
-- Load JSON data into staging table
-- using COPY statement, Snowpipe,
-- or inserts
SELECT * FROM data_staging;
+--------------------------------------+
| RAW |
|--------------------------------------|
| { |
| "id": 7077, |
| "x1": "2018-08-14T20:57:01-07:00", |
| "x2": [ |
| { |
| "y1": "green", |
| "y2": "35" |
| } |
| ] |
| } |
| { |
| "id": 7078, |
| "x1": "2018-08-14T21:07:26-07:00", |
| "x2": [ |
| { |
| "y1": "cyan", |
| "y2": "107" |
| } |
| ] |
| } |
+--------------------------------------+
-- Stream table shows inserted data
SELECT * FROM data_check;
+--------------------------------------+-----------------+-------------------+------------------------------------------+
| RAW | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|--------------------------------------+-----------------+-------------------|------------------------------------------|
| { | INSERT | False | 789012e01ef4j3k890123k35mnopqr567890124j |
| "id": 7077, | | | |
| "x1": "2018-08-14T20:57:01-07:00", | | | |
| "x2": [ | | | |
| { | | | |
| "y1": "green", | | | |
| "y2": "35" | | | |
| } | | | |
| ] | | | |
| } | | | |
| { | INSERT | False | 765432u89tk3l6y456789012rst7vx678912456k |
| "id": 7078, | | | |
| "x1": "2018-08-14T21:07:26-07:00", | | | |
| "x2": [ | | | |
| { | | | |
| "y1": "cyan", | | | |
| "y2": "107" | | | |
| } | | | |
| ] | | | |
| } | | | |
+--------------------------------------+-----------------+-------------------+------------------------------------------+
-- Access and lock the stream
BEGIN;
-- Transform and copy JSON elements into relational columns
-- in the production tables
INSERT INTO data_prod1 (id, ts)
SELECT t.raw:id, to_timestamp_tz(t.raw:x1)
FROM data_check t
WHERE METADATA$ACTION = 'INSERT';
INSERT INTO data_prod2 (id, color, num)
SELECT t.raw:id, f.value:y1, f.value:y2
FROM data_check t
, lateral flatten(input => raw:x2) f
WHERE METADATA$ACTION = 'INSERT';
-- Commit changes in the stream objects participating in the transaction
COMMIT;
SELECT * FROM data_prod1;
+------+---------------------------+
| ID | TS |
|------+---------------------------|
| 7077 | 2018-08-14 20:57:01 -0700 |
| 7078 | 2018-08-14 21:07:26 -0700 |
+------+---------------------------+
SELECT * FROM data_prod2;
+------+-------+-----+
| ID | COLOR | NUM |
|------+-------+-----|
| 7077 | green | 35 |
| 7078 | cyan | 107 |
+------+-------+-----+
SELECT * FROM data_check;
+-----+-----------------+-------------------+
| RAW | METADATA$ACTION | METADATA$ISUPDATE |
|-----+-----------------+-------------------|
+-----+-----------------+-------------------+
Fluxos em exibições¶
Fluxo em uma exibição com junções de várias tabelas¶
-- Create multiple tables with matching column values.
CREATE TABLE birds (
id number,
common varchar(100),
class varchar(100)
);
CREATE TABLE sightings (
d date,
loc varchar(100),
b_id number,
c number
);
-- Create a view that queries the tables with a join.
CREATE VIEW bird_sightings AS
SELECT b.id AS id,
b.common AS common_name,
b.class AS classification,
s.d AS date,
s.loc AS location,
s.c AS count
FROM birds b
INNER JOIN sightings s ON b.id = s.b_id;
-- Create a stream on the view.
CREATE STREAM bird_sightings_s ON VIEW bird_sightings;
-- Insert values into the tables.
INSERT INTO birds
VALUES
(1,'Scarlet Tanager','P. olivacea'),
(14,'Mallard','A. platyrhynchos'),
(48,'Spotted Sandpiper','A. macularius'),
(92,'Great Blue Heron','A. herodias');
INSERT INTO sightings
VALUES
(current_date(),'Gibson Island',1,4),
(current_date(),'Lake Los Pajaro',14,12),
(current_date(),'Lake Los Pajaro',92,12),
(current_date(),'Gibson Island',14,21),
(current_date(),'Gibson Island',92,5);
-- Query the stream.
-- The stream displays a record for each row added to the view.
SELECT * FROM bird_sightings_s;
+----+------------------+------------------+------------+-----------------+-------+------------------------------------------+-----------------+-------------------+
| ID | COMMON_NAME | CLASSIFICATION | DATE | LOCATION | COUNT | METADATA$ROW_ID | METADATA$ACTION | METADATA$ISUPDATE |
|----+------------------+------------------+------------+-----------------+-------+------------------------------------------+-----------------+-------------------|
| 1 | Scarlet Tanager | P. olivacea | 2021-09-07 | Gibson Island | 4 | a2522b47726ac2a922104c8e2f668d065ff6fcd0 | INSERT | False |
| 14 | Mallard | A. platyrhynchos | 2021-09-07 | Lake Los Pajaro | 12 | fceb4ad5cb6d2df2865d0f572b8a2aa98f240b70 | INSERT | False |
| 92 | Great Blue Heron | A. herodias | 2021-09-07 | Lake Los Pajaro | 12 | 0db99176fe8bd50749b2b48fb2befab416ff9272 | INSERT | False |
| 14 | Mallard | A. platyrhynchos | 2021-09-07 | Gibson Island | 21 | 2e94ef3a33e52ba5de5d816dc41c60fedf9cb1eb | INSERT | False |
| 92 | Great Blue Heron | A. herodias | 2021-09-07 | Gibson Island | 5 | a1df477ac8e388e1cf0ada77e9097c6effa346a7 | INSERT | False |
+----+------------------+------------------+------------+-----------------+-------+------------------------------------------+-----------------+-------------------+
-- Consume the stream records in a DML statement (INSERT, MERGE, etc.).
-- Query the stream.
-- The stream is empty.
+----+-------------+----------------+------+----------+-------+-----------------+-----------------+-------------------+
| ID | COMMON_NAME | CLASSIFICATION | DATE | LOCATION | COUNT | METADATA$ROW_ID | METADATA$ACTION | METADATA$ISUPDATE |
|----+-------------+----------------+------+----------+-------+-----------------+-----------------+-------------------|
+----+-------------+----------------+------+----------+-------+-----------------+-----------------+-------------------+
-- Delete a row from the birds table.
DELETE FROM birds WHERE id = 14;
-- Query the stream.
-- The stream displays two records for the single DELETE operation.
SELECT * FROM bird_sightings_s;
+----+-------------+------------------+------------+-----------------+-------+------------------------------------------+-----------------+-------------------+
| ID | COMMON_NAME | CLASSIFICATION | DATE | LOCATION | COUNT | METADATA$ROW_ID | METADATA$ACTION | METADATA$ISUPDATE |
|----+-------------+------------------+------------+-----------------+-------+------------------------------------------+-----------------+-------------------|
| 14 | Mallard | A. platyrhynchos | 2021-09-07 | Lake Los Pajaro | 12 | 83c22ff4be80d65a2e9776df0e35b22079cb4430 | DELETE | False |
| 14 | Mallard | A. platyrhynchos | 2021-09-07 | Gibson Island | 21 | e29cfae8c3c7d261ed903c2303f61e4d49c01ba1 | DELETE | False |
+----+-------------+------------------+------------+-----------------+-------+------------------------------------------+-----------------+-------------------+
Fluxo em uma exibição que chama uma função SQL não determinista¶
-- Create a table.
CREATE TABLE ndf (
c1 number
);
-- Create a view that queries the table and
-- also returns the CURRENT_USER and CURRENT_TIMESTAMP values
-- for the query transaction.
CREATE VIEW ndf_v AS
SELECT CURRENT_USER() AS u,
CURRENT_TIMESTAMP() AS ts,
c1 AS num
FROM ndf;
-- Create a stream on the view.
CREATE STREAM ndf_s ON VIEW ndf_v;
-- User peter inserts rows into table ndf.
INSERT INTO ndf
VALUES
(1),
(2),
(3);
-- User marie inserts rows into table ndf.
INSERT INTO ndf
VALUES
(4),
(5),
(6);
-- User PETER queries the stream.
-- The stream returns the username for the user.
-- The stream also returns the current timestamp for the query transaction in each row,
-- NOT the timestamp when each row was inserted.
SELECT * FROM ndf_s;
+-------+-------------------------------+-----+-----------------+------------------------------------------+
| U | TS | NUM | METADATA$ACTION | METADATA$ROW_ID |
|-------+-------------------------------+-----+-----------------+------------------------------------------|
| PETER | 2021-08-16 11:56:33.778 -0700 | 1 | INSERT | d200504bf3049a7d515214408d9a804fd03b46cd |
| PETER | 2021-08-16 11:56:33.778 -0700 | 2 | INSERT | d0a551cecbee0f9ad2b8a9e81bcc33b15a525a1e |
| PETER | 2021-08-16 11:56:33.778 -0700 | 3 | INSERT | b98ad609fffdd6f00369485a896c52ca93b92b1f |
| PETER | 2021-08-16 11:56:33.778 -0700 | 4 | INSERT | 62d34abc3fac85c037fb9f47f7758f08d025d9ed |
| PETER | 2021-08-16 11:56:33.778 -0700 | 5 | INSERT | e554e6e68293a51d8e69d68e9b6be991453cc901 |
| PETER | 2021-08-16 11:56:33.778 -0700 | 6 | INSERT | f6fa32c498a28b2349d2c6f6be55c30eb1d5310f |
+-------+-------------------------------+-----+-----------------+------------------------------------------+
-- User MARIE queries the stream.
-- The stream returns the username for the user
-- and the current timestamp for the query transaction in each row.
SELECT * FROM ndf_s;
+-------+-------------------------------+-----+-----------------+------------------------------------------+
| U | TS | NUM | METADATA$ACTION | METADATA$ROW_ID |
|-------+-------------------------------+-----+-----------------+------------------------------------------|
| MARIE | 2021-08-16 12:04:21.768 -0700 | 1 | INSERT | d200504bf3049a7d515214408d9a804fd03b46cd |
| MARIE | 2021-08-16 12:04:21.768 -0700 | 2 | INSERT | d0a551cecbee0f9ad2b8a9e81bcc33b15a525a1e |
| MARIE | 2021-08-16 12:04:21.768 -0700 | 3 | INSERT | b98ad609fffdd6f00369485a896c52ca93b92b1f |
| MARIE | 2021-08-16 12:04:21.768 -0700 | 4 | INSERT | 62d34abc3fac85c037fb9f47f7758f08d025d9ed |
| MARIE | 2021-08-16 12:04:21.768 -0700 | 5 | INSERT | e554e6e68293a51d8e69d68e9b6be991453cc901 |
| MARIE | 2021-08-16 12:04:21.768 -0700 | 6 | INSERT | f6fa32c498a28b2349d2c6f6be55c30eb1d5310f |
+-------+-------------------------------+-----+-----------------+------------------------------------------+