Änderungsnachverfolgung mit Tabellenstreams

Ein Streamobjekt erfasst an Tabellen vorgenommene Data Manipulation Language (DML)-Änderungen, einschließlich Einfügungen, Aktualisierungen und Löschvorgänge sowie Metadaten zu jeder Änderung, sodass mit den geänderten Daten Aktionen durchgeführt werden können. Dieser Vorgang wird als Change Data Capture (CDC) bezeichnet. Ein einzelner Tabellen-Stream verfolgt die Änderungen, die an Zeilen in einer Quelltabelle vorgenommen werden. Ein Tabellen-Stream (auch einfach als „Stream“ bezeichnet) stellt eine „Änderungstabelle“ zur Verfügung, die angibt, was sich auf Zeilenebene zwischen zwei Transaktionszeitpunkten in einer Tabelle geändert hat. Auf diese Weise können Sie eine Folge von Änderungsdatensätzen auf transaktionale Weise abfragen und verbrauchen.

Unter diesem Thema:

Übersicht über Tabellen-Streams

Bei der Erzeugung erstellt ein Tabellen-Stream logisch einen ersten Snapshot jeder Zeile in der Quellentabelle, indem ein Zeitpunkt (Offset) als aktuelle Transaktionsversion der Tabelle initialisiert wird. Das vom Stream verwendete Änderungsnachverfolgungssystem zeichnet dann Informationen über die DML-Änderungen (Einfügen, Aktualisieren, Löschen) auf, für die nach Erstellung dieses Snapshots das Commit ausgeführt wurde. Änderungsdatensätze geben den Status einer Zeile vor und nach der Änderung an. Änderungsinformationen spiegeln die Spaltenstruktur der nachverfolgten Quelltabelle wider und enthalten zusätzliche Metadatenspalten, die jedes Änderungsereignis beschreiben.

Beachten Sie, dass ein Stream selbst keine Tabellendaten enthält. Ein Stream speichert den Offset für die Quelltabelle und gibt CDC-Datensätze zurück, indem der Versionsverlauf für die Quelltabelle genutzt wird. Wenn der erste Stream für eine Tabelle erstellt wird, wird ein Paar ausgeblendeter Spalten zur Quelltabelle hinzugefügt und beginnt mit dem Speichern von Metadaten zur Änderungsnachverfolgung. Diese Spalten verbrauchen nur wenig Speicherplatz. Die CDC-Datensätze, die beim Abfragen eines Streams zurückgegeben werden, basieren auf einer Kombination aus dem im Stream gespeicherten Offset und den in der Tabelle gespeicherten Änderungsverfolgungsmetadaten.

Es kann nützlich sein, sich einen Stream als Lesezeichen vorzustellen, das einen Zeitpunkt auf den Seiten eines Buches (d. h. der Quelltabelle) angibt. Ein Lesezeichen kann weggeworfen werden, und andere Lesezeichen können an anderen Stellen in einem Buch eingefügt werden. Ebenso kann ein Stream gelöscht werden, und andere Streams können zum selben oder zu unterschiedlichen Zeitpunkten erstellt werden (entweder indem die Streams zu unterschiedlichen Zeiten nacheinander erstellt werden oder indem Time Travel verwendet wird), um die Änderungsdatensätze für eine Tabelle bei gleichen oder verschiedenen Offsets zu verarbeiten.

Ein Beispiel für einen Verbraucher von CDC-Datensätzen ist eine Datenpipeline, in der nur die Daten in Stagingtabellen, die sich seit der letzten Extraktion geändert haben, transformiert und in andere Tabellen kopiert werden.

Beachten Sie, dass Streams Änderungen in materialisierten Ansichten nicht nachverfolgen können.

Tabellenversionierung

Ein Stream behält einen bestimmten Zeitpunkt in der transaktionsversionierten Zeitleiste der Quelltabelle bei, der als Offset bezeichnet wird und an dem Transaktionspunkt beginnt, an dem der Stream-Inhalt mithilfe einer DML-Anweisung zuletzt verbraucht wurde. Der Stream kann eine Reihe von Änderungen vom aktuellen Offset bis zur aktuellen Transaktionszeit der Quelltabelle (d. h. der aktuellen Version der Tabelle) bereitstellen. Der Stream behält nur das Delta der Änderungen bei. Wenn mehrere DML-Anweisungen eine Zeile ändern, enthält der Stream nur die letzte in dieser Zeile ausgeführte Aktion.

Mehrere Abfragen können unabhängig voneinander dieselben Änderungsdaten aus einem Stream verbrauchen, ohne den Offset zu ändern. Ein Stream erhöht den Offset nur, wenn er in einer DML-Transaktion verwendet wird, einschließlich Autocommit-Transaktionen (d. h. ohne explizites Starten einer Transaktion ausgeführt). Standardmäßig wird eine Autocommit-Transaktion bei Erfolg automatisch übertragen oder bei einem Fehler am Ende der Anweisung zurückgesetzt. Dieses Verhalten wird mit dem Parameter AUTOCOMMIT gesteuert).

Wenn eine SQL-Anweisung einen Stream innerhalb einer expliziten Transaktion abfragt, wird der Stream an der Streamspitze (d. h. dem Zeitstempel) abgefragt, als die Transaktion begann und nicht als die Anweisung ausgeführt wurde. Dieses Verhalten betrifft sowohl DML-Anweisungen als auch CREATE TABLE … AS SELECT (CTAS)-Anweisungen, die eine neue Tabelle mit Zeilen aus einem vorhandenen Stream füllen.

Der aktuelle Offset für einen Stream kann durch Abfragen der Funktion SYSTEM$STREAM_GET_TABLE_TIMESTAMP ermittelt werden.

Eine DML-Anweisung, die ein Select auf einem Stream ausführt, verbraucht alle Änderungsdaten im Stream, solange die Transaktion ein Commit erfolgreich ausführen kann. Um sicherzustellen, dass verschiedene Anweisungen auf dieselben Änderungsdatensätze im Stream zugreifen, müssen Sie sie mit einer expliziten Transaktionsanweisung (BEGIN .. COMMIT) umgeben. Dies sperrt den Stream. DML-Aktualisierungen an der Quelltabelle in parallelen Transaktionen werden vom Änderungsnachverfolgungssystem verfolgt, aktualisieren den Stream jedoch erst, wenn für die explizite Transaktionsanweisung ein Commit ausgeführt wurde und die vorhandenen Änderungsdaten verbraucht wurden.

Wiederholbare Leseisolation

Streams unterstützen wiederholbare Leseisolation. Im wiederholbaren Lesemodus sehen verschiedene SQL-Anweisungen innerhalb einer Transaktion die gleichen Datensätze in einem Stream. Dies unterscheidet sich von dem für Tabellen unterstützten Read Committed-Modus, in dem Anweisungen alle Änderungen anzeigen, die von vorherigen Anweisungen vorgenommen wurden, die in derselben Transaktion ausgeführt wurden, obwohl für diese Änderungen noch kein Commit ausgeführt wurde.

Die Delta-Datensätze, die von Streams in einer Transaktion zurückgegeben werden, beinhalten den Bereich von der aktuellen Position des Streams bis zur Startzeit der Transaktion. Die Streamposition rückt zur Transaktionsstartzeit vor, wenn der Transaktions-Commit ausgeführt wird, andernfalls bleibt sie an der gleichen Position.

Betrachten Sie das folgende Beispiel:

Zeit

Transaktion 1

Transaktion 2

1

Starten Sie eine Transaktion.

2

Abfrage-Stream s1 für Tabelle t1. Der Stream gibt die Change Data Capture-Datensätze . zwischen der aktuellen Position bis zur Startzeit von Transaktion 1 zurück. Wenn der Stream in einer DML-Anweisung . verwendet wird, wird der Stream gesperrt, um Änderungen durch gleichzeitige Transaktionen zu verhindern.

3

Aktualisieren Sie Zeilen in Tabelle t1.

4

Abfrage-Stream s1. Gibt den gleichen Status des Streams zurück, den er zu Zeit 2 hatte.

5

Führen Sie den Transaktions-Commit aus. Wenn der Stream in DML-Anweisungen innerhalb der Transaktion verbraucht wurde, rückt die Stream-Position zur Transaktionsstartzeit vor.

6

Starten Sie eine Transaktion.

7

Abfrage-Stream s1. Die Ergebnisse enthalten Tabellenänderungen, für die Transaktion 1 ein Commit ausgeführt hat.

In Transaktion 1 werden allen Abfragen an Stream s1 dieselben Datensätze angezeigt. DML-Änderungen an Tabelle t1 werden nur dann im Stream aufgezeichnet, wenn für die Transaktion ein Commit ausgeführt wurde.

In Transaktion 2 werden Abfragen an den Stream die Änderungen angezeigt, die in der Tabelle in Transaktion 1 aufgezeichnet wurden. Beachten Sie aber Folgendes: Wenn Transaktion 2 begonnen hätte, bevor das Commit für Transaktion 1 ausgeführt worden wäre, hätten Abfragen an den Stream einen Snapshot des Streams von der Position des Streams bis zur Startzeit von Transaktion 2 zurückgegeben und keine der für Transaktion 1 committeten Änderungen wäre sichtbar.

Stream-Spalten

In einem Stream werden Daten in derselben Form wie in der Quelltabelle gespeichert (d. h. mit denselben Spaltennamen und derselben Reihenfolge), mit den folgenden zusätzlichen Spalten:

METADATA$ACTION

Gibt die erfasste DML-Operation (INSERT, DELETE) an.

METADATA$ISUPDATE

Gibt an, ob die Operation Teil einer UPDATE-Anweisung war. Aktualisierungen von Zeilen in der Quelltabelle werden als ein Paar von DELETE- und INSERT-Datensätzen im Stream dargestellt, wobei METADATA$ISUPDATE-Werte einer Metadatenspalte auf TRUE gesetzt sind.

Beachten Sie, dass Streams die Unterschiede zwischen zwei Offsets aufzeichnen. Wenn eine Zeile hinzugefügt und dann im aktuellen Offset aktualisiert wird, ist die Deltaänderung eine neue Zeile. Die Zeile METADATA$ISUPDATE zeichnet einen FALSE-Wert auf.

METADATA$ROW_ID

Gibt die eindeutige und unveränderliche Zeilen-ID an, mit der Änderungen an bestimmten Zeilen über die Zeit verfolgt werden können.

Typen von Streams

Die folgenden Streamtypen sind basierend auf den von jedem Stream aufgezeichneten Metadaten verfügbar:

Standard

Ein Standard-Tabellenstream (d. h. Delta-Tabellenstream) verfolgt alle DML-Änderungen an der Quelltabelle wie Einfügungen, Aktualisierungen und Löschungen (einschließlich Tabellenkürzungen). Dieser Streamtyp führt eine Verknüpfung (Join) für eingefügte und gelöschte Zeilen im Änderungsset durch, um das Delta auf Zeilenebene bereitzustellen. Als Gesamteffekt wird beispielsweise eine Zeile, die zwischen zwei Transaktionszeitpunkten in einer Tabelle eingefügt und dann gelöscht wird, im Delta entfernt (d. h. sie wird nicht zurückgegeben, wenn der Stream abgefragt wird).

Nur-Anfügen

Bei einem Nur-Anfügen-Tabellenstream werden nur Zeileneinfügungen protokolliert. Aktualisierungs- und Löschoperationen (einschließlich Tabellenkürzungen) werden nicht aufgezeichnet. Wenn beispielsweise 10 Zeilen in eine Tabelle eingefügt werden und dann 5 dieser Zeilen gelöscht werden, bevor der Offset für einen Nur-Anfügen-Stream erweitert wird, zeichnet der Stream 10 Zeilen auf.

Ein Nur-Anfügen-Stream gibt nur die angefügten Zeilen zurück und kann daher im Gegensatz zu einem Standardstream viel leistungsfähiger beim Extrahieren, Laden, Transformieren (ELT) und bei ähnlichen Szenarios sein, die ausschließlich von Zeileneinfügungen abhängig sind. Beispiel: Die Quelltabelle kann unmittelbar nach dem Verarbeiten der Zeilen eines Nur-Anfügen-Streams abgeschnitten werden, und das Löschen von Datensätzen produziert keinen Overhead, wenn der Stream das nächste Mal abgefragt oder verarbeitet wird.

Nur Einfügen

Wird nur bei externen Tabellen unterstützt. Bei Nur-Einfügen-Streams werden nur Zeileneinfügungen verfolgt. Es werden keine Löschoperationen aufgezeichnet, mit denen Zeilen aus einem eingefügten Set entfernt werden (d. h. NoOps). Wenn beispielsweise zwischen zwei beliebigen Offsets „File1“ aus dem Cloudspeicherort entfernt wird, auf den in der externen Tabelle verwiesen wird, und „File2“ hinzugefügt wird, gibt der Stream nur Datensätze für die Zeilen in „File2“ zurück. Im Gegensatz zur Verfolgung von CDC-Daten für Standardtabellen kann Snowflake nicht auf die historischen Datensätze von Dateien im Cloudspeicher zugreifen.

Bemerkung

Die Unterstützung von Nur-Einfügen-Tabellenstreams wird als Vorschau-Funktion bereitgestellt.

Datenfluss

Die folgende Abbildung zeigt, wie sich der Inhalt eines Standard-Tabellenstreams ändert, wenn Zeilen in der Quelltabelle aktualisiert werden. Immer, wenn eine DML-Anweisung die Streaminhalte verarbeitet hat, wechselt die Streamposition zur Nachverfolgung auf den nächsten Satz an DML-Änderungen auf der Tabelle (d. h. den Änderungen in einer Tabellenversion):

Streams Example

Datenaufbewahrungsfrist und Veraltung

Ein Stream veraltet, wenn sein Offset außerhalb der Datenaufbewahrungsfrist für seine Quelltabelle liegt. Wenn ein Stream veraltet ist, kann nicht mehr auf die historischen Daten der Quelltabelle zugegriffen werden, auch nicht auf ungenutzte Änderungsdatensätze. Um neue Änderungsdatensätze für die Tabelle verfolgen zu können, müssen Sie den Stream neu erstellen (mit CREATE STREAM). Verbrauchen Sie die Stream-Datensätze innerhalb einer Transaktion während der Aufbewahrungsfrist für die Tabelle, um zu verhindern, dass der Stream veraltet. Weitere Informationen zur Datenaufbewahrungsfrist finden Sie unter Verstehen und Verwenden von Time Travel.

Wenn die Datenaufbewahrungsfrist für eine Quelltabelle weniger als 14 Tage beträgt und kein Stream verbraucht wurde, verlängert Snowflake diese Frist vorübergehend, um ein Veralten zu verhindern. Die Frist wird unabhängig von der Snowflake Edition Ihres Kontos auf den Offset des Streams oder maximal 14 Tage verlängert. Wenn der Stream verarbeitet wurde, wird die verlängerte Datenaufbewahrungsfrist wieder auf die Standardfrist der Tabelle verkürzt.

Führen Sie den Befehl DESCRIBE STREAM oder SHOW STREAMS aus, um festzustellen, ob ein Stream veraltet ist. Wenn in der Befehlsausgabe der Spaltenwert STALE für den Stream TRUE lautet, ist der Stream veraltet.

Bemerkung

Wenn eine Datenbank oder ein Schema geklont wird, die bzw. das eine Quelltabelle und einen Stream enthält, kann derzeit auf keinen der nicht verbrauchten Datensätze in den Streams (des Klons) zugegriffen werden. Dieses Verhalten ist konsistent mit Time Travel für Tabellen. Wenn eine Tabelle geklont wird, beginnen die historischen Daten für den Tabellenklon zu dem Zeitpunkt, an dem der Klon erstellt wurde.

Streamverarbeitung mithilfe von Aufgaben

Mehrere Aufgaben, die Änderungsdaten aus einem einzelnen Tabellenstream verwenden, rufen verschiedene Deltas ab. Wenn eine Aufgabe die Änderungsdaten in einem Stream mithilfe einer DML-Anweisung verarbeitet, erhöht der Stream den Offset. Die Änderungsdaten stehen für die nächste Aufgabe nicht mehr zur Verfügung. Derzeit wird empfohlen, dass die Änderungsdaten aus einem Stream nur durch eine einzige Aufgabe verarbeitet werden. Für dieselbe Tabelle können mehrere Streams erstellt und von verschiedenen Aufgaben verarbeitet werden.

CHANGES-Klausel: Schreibgeschützte Alternative zu Streams

Als Alternative zu Streams unterstützt Snowflake das Abfragen von Änderungsverfolgungsmetadaten für Tabellen mithilfe der CHANGES-Klausel für SELECT-Anweisungen. Die CHANGES-Klausel ermöglicht das Abfragen von Änderungsverfolgungsmetadaten zwischen zwei Zeitpunkten, ohne dass ein Tabellenstream mit einem expliziten Transaktionsoffset erstellt werden muss. Wenn Sie die CHANGES-Klausel verwenden, wird der Offset nicht erhöht (d. h. es werden keine Datensätze verarbeitet). Die Änderungsverfolgungsmetadaten zwischen verschiedenen Transaktionsstart- und -endpunkten können von mehreren Abfragen abgerufen werden. Für diese Option muss ein Transaktionsstartpunkt für die Metadaten mithilfe einer AT | BEFORE-Klausel angegeben werden. Der Endpunkt für das Änderungsverfolgungsintervall kann mit der optionalen END-Klausel festgelegt werden.

Ein Stream speichert die aktuelle Transaktionsversion einer Tabelle und ist in den meisten Szenarien die geeignete Quelle für CDC-Datensätze. Für seltene Szenarien, in denen der Offset für beliebige Zeiträume verwaltet werden muss, steht Ihnen die CHANGES-Klausel zur Verfügung.

Bemerkung

Derzeit muss eine der beiden folgenden Bedingungen erfüllt sein, bevor Änderungsverfolgungsmetadaten für eine Tabelle aufgezeichnet werden:

  • Die Änderungsverfolgung ist für die Tabelle aktiviert (mit ALTER TABLE … CHANGE_TRACKING = TRUE).

  • Für die Tabelle wird ein Stream erstellt (mit CREATE STREAM).

Beide Optionen fügen der Tabelle ein Paar versteckter Spalten hinzu und beginnen mit dem Speichern von Metadaten zur Änderungsverfolgung. Die Spalten verbrauchen wenig Speicherplatz.

Für den Zeitraum, bevor eine dieser Bedingungen erfüllt ist, sind keine Änderungsverfolgungsmetadaten für die Tabelle verfügbar.

Streams auf freigegebenen Tabellen

Beim Erstellen von Streams auf freigegebenen Tabellen können Datenverbraucher Änderungen verfolgen, die mittels Datenmanipulationssprache (DML) an diesen Tabellen vorgenommen wurden. Diese Funktionalität ähnelt dem Erstellen und Verwenden von Streams auf „lokalen“ Tabellen (d. h. Tabelle und Stream befinden sich in demselben Konto). Unter diesem Thema werden die Schritte beschrieben, mit denen Datenanbieter ihre freigegebenen Tabellen für das Erstellen von Streams konfigurieren und Verbraucher diese Streams erstellen können.

Eine Anleitung dazu finden Sie unter:

Abrechnung für Streams

Wie unter Datenaufbewahrungsfrist und Veraltung (unter diesem Thema) beschrieben, verlängert Snowflake vorübergehend die Datenaufbewahrungsfrist für die Quelltabelle, wenn ein Stream nicht regelmäßig verarbeitet wird. Wenn die Datenaufbewahrungsfrist der Tabelle weniger als 14 Tage beträgt, wird der Zeitraum unabhängig von der Snowflake-Edition Ihres Kontos automatisch auf den kleineren Transaktionsoffset des Streams oder auf 14 Tage (wenn die Datenaufbewahrungsfrist der Tabelle kleiner als 14 Tage ist) verlängert.

Beachten Sie, dass eine erweiterte Datenaufbewahrungsfrist zusätzlichen Speicherplatz erfordert, was sich in Ihren monatlichen Speicherkosten widerspiegelt.

Die mit einem Stream verbundenen Hauptkosten entstehen durch die Verarbeitungszeit, die ein virtuelles Warehouse für die Abfrage des Streams benötigt. Diese Gebühren sind auf der Rechnung als übliche Snowflake-Credits ausgewiesen.

Stream-DDL

Um das Erstellen und Verwalten von Streams zu unterstützen, bietet Snowflake den folgenden Satz von speziellen DDL-Befehlen:

Darüber hinaus können Anbieter den Zugriff auf die für ELT erforderlichen Datenbankobjekte mit der folgenden DDL-Standardzugriffssteuerung anzeigen, gewähren oder widerrufen:

Erforderliche Zugriffsrechte

Das Erstellen und Verwalten von Streams erfordert eine Rolle mit mindestens den folgenden Rollenberechtigungen:

Objekt

Berechtigung

Anmerkungen

Datenbank

USAGE

Schema

USAGE, CREATE STREAM

Tabelle

SELECT

Das Abfragen eines Streams erfordert eine Rolle mit mindestens den folgenden Rollenberechtigungen:

Objekt

Berechtigung

Anmerkungen

Datenbank

USAGE

Schema

USAGE

Stream

SELECT

Tabelle

SELECT

Beispiele

Beispiel 1

Das folgende Beispiel zeigt, wie sich der Inhalt eines Streams ändert, wenn auf der Quelltabelle DML-Anweisungen ausgeführt werden:

-- Create a table to store the names and fees paid by members of a gym
CREATE OR REPLACE TABLE members (
  id number(8) NOT NULL,
  name varchar(255) default NULL,
  fee number(3) NULL
);

-- Create a stream to track changes to date in the MEMBERS table
CREATE OR REPLACE STREAM member_check ON TABLE members;

-- Create a table to store the dates when gym members joined
CREATE OR REPLACE TABLE signup (
  id number(8),
  dt DATE
  );

INSERT INTO members (id,name,fee)
VALUES
(1,'Joe',0),
(2,'Jane',0),
(3,'George',0),
(4,'Betty',0),
(5,'Sally',0);

INSERT INTO signup
VALUES
(1,'2018-01-01'),
(2,'2018-02-15'),
(3,'2018-05-01'),
(4,'2018-07-16'),
(5,'2018-08-21');

-- The stream records the inserted rows
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
|  1 | Joe    |   0 | INSERT          | False             | d200504bf3049a7d515214408d9a804fd03b46cd |
|  2 | Jane   |   0 | INSERT          | False             | d0a551cecbee0f9ad2b8a9e81bcc33b15a525a1e |
|  3 | George |   0 | INSERT          | False             | b98ad609fffdd6f00369485a896c52ca93b92b1f |
|  4 | Betty  |   0 | INSERT          | False             | e554e6e68293a51d8e69d68e9b6be991453cc901 |
|  5 | Sally  |   0 | INSERT          | False             | c94366cf8a4270cf299b049af68a04401c13976d |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

-- Apply a $90 fee to members who joined the gym after a free trial period ended:
MERGE INTO members m
  USING (
    SELECT id, dt
    FROM signup s
    WHERE DATEDIFF(day, '2018-08-15'::date, s.dt::DATE) < -30) s
    ON m.id = s.id
  WHEN MATCHED THEN UPDATE SET m.fee = 90;

SELECT * FROM members;

+----+--------+-----+
| ID | NAME   | FEE |
|----+--------+-----|
|  1 | Joe    |  90 |
|  2 | Jane   |  90 |
|  3 | George |  90 |
|  4 | Betty  |   0 |
|  5 | Sally  |   0 |
+----+--------+-----+

-- The stream records the updated FEE column as a set of inserts
-- rather than deletes and inserts because the stream contents
-- have not been consumed yet
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
|  1 | Joe    |  90 | INSERT          | False             | 957e84b34ef0f3d957470e02bddccb027810892c |
|  2 | Jane   |  90 | INSERT          | False             | b00168a4edb9fb399dd5cc015e5f78cbea158956 |
|  3 | George |  90 | INSERT          | False             | 75206259362a7c89126b7cb039371a39d821f76a |
|  4 | Betty  |   0 | INSERT          | False             | 9b225bc2612d5e57b775feea01dd04a32ce2ad18 |
|  5 | Sally  |   0 | INSERT          | False             | 5a68f6296c975980fbbc569ce01033c192168eca |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

-- Create a table to store member details in production
CREATE OR REPLACE TABLE members_prod (
  id number(8) NOT NULL,
  name varchar(255) default NULL,
  fee number(3) NULL
);

-- Insert the first batch of stream data into the production table
INSERT INTO members_prod(id,name,fee) SELECT id, name, fee FROM member_check WHERE METADATA$ACTION = 'INSERT';

-- The stream position is advanced
select * from member_check;

+----+------+-----+-----------------+-------------------+-----------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----+-----------------+-------------------+-----------------|
+----+------+-----+-----------------+-------------------+-----------------+

-- Access and lock the stream
BEGIN;

-- Increase the fee paid by paying members
UPDATE members SET fee = fee + 15 where fee > 0;

+------------------------+-------------------------------------+
| number of rows updated | number of multi-joined rows updated |
|------------------------+-------------------------------------|
|                      3 |                                   0 |
+------------------------+-------------------------------------+

-- These changes are not visible because the change interval of the stream object starts at the current offset and ends at the current transactional time point, which is the beginning time of the transaction
SELECT * FROM member_check;

+----+------+-----+-----------------+-------------------+-----------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----+-----------------+-------------------+-----------------|
+----+------+-----+-----------------+-------------------+-----------------+

-- Commit changes
COMMIT;

-- The changes surface now because the stream object uses the current transactional time as the end point of the change interval that now includes the changes in the source table
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
|  1 | Joe    | 105 | INSERT          | True              | 123a45b67cd0e8f012345g01abcdef012345678a |
|  2 | Jane   | 105 | INSERT          | True              | 456b45b67cd1e8f123456g01ghijkl123456779b |
|  3 | George | 105 | INSERT          | True              | 567890c89de2f9g765438j20jklmn0234567890d |
|  1 | Joe    |  90 | DELETE          | True              | 123a45b67cd0e8f012345g01abcdef012345678a |
|  2 | Jane   |  90 | DELETE          | True              | 456b45b67cd1e8f123456g01ghijkl123456779b |
|  3 | George |  90 | DELETE          | True              | 567890c89de2f9g765438j20jklmn0234567890d |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

Beispiel 2

Das folgende Beispiel zeigt die Unterschiede im Verhalten von Standardstreams (Delta) und Nur-Anfügen-Streams:

-- Create a source table.
create or replace table t(id int, name string);

-- Create a standard stream on the source table.
create or replace  stream delta_s on table t;

-- Create an append-only stream on the source table.
create or replace  stream append_only_s on table t append_only=true;

-- Insert 3 rows into the source table.
insert into t values (0, 'charlie brown');
insert into t values (1, 'lucy');
insert into t values (2, 'linus');

-- Delete 1 of the 3 rows.
delete from t where id = '0';

-- The standard stream removes the deleted row.
select * from delta_s order by id;

+----+-------+-----------------+-------------------+------------------------------------------+
| ID | NAME  | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+-------+-----------------+-------------------+------------------------------------------|
|  1 | lucy  | INSERT          | False             | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 |
|  2 | linus | INSERT          | False             | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+-------+-----------------+-------------------+------------------------------------------+

-- The append-only stream does not remove the deleted row.
select * from append_only_s order by id;

+----+---------------+-----------------+-------------------+------------------------------------------+
| ID | NAME          | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+---------------+-----------------+-------------------+------------------------------------------|
|  0 | charlie brown | INSERT          | False             | e83abf629af50ccf94d1e78c547bfd8079e68d00 |
|  1 | lucy          | INSERT          | False             | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 |
|  2 | linus         | INSERT          | False             | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+---------------+-----------------+-------------------+------------------------------------------+

-- Create a table to store the change data capture records in each of the 3 streams.
create or replace  table t2(id int, name string, stream_type string default NULL);

-- Insert the records from the streams into the new table, advancing the offset of each stream.
insert into t2(id,name,stream_type) select id, name, 'delta stream' from delta_s;
insert into t2(id,name,stream_type) select id, name, 'append_only stream' from append_only_s;

-- Update a row in the source table.
update t set name = 'sally' where name = 'linus';

-- The standard stream records the update operation.
select * from delta_s order by id;

+----+-------+-----------------+-------------------+------------------------------------------+
| ID | NAME  | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+-------+-----------------+-------------------+------------------------------------------|
|  2 | sally | INSERT          | True              | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
|  2 | linus | DELETE          | True              | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+-------+-----------------+-------------------+------------------------------------------+

-- The append-only stream does not record the update operation.
select * from append_only_s order by id;

+----+------+-----------------+-------------------+-----------------+
| ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----------------+-------------------+-----------------|
+----+------+-----------------+-------------------+-----------------+

Das folgende Beispiel zeigt, wie Streams in ELT-Prozessen (Extrahieren, Laden, Transformieren) verwendet werden können. In diesem Beispiel werden neue Daten, die in eine Stagingtabelle eingefügt wurden, von einem Stream verfolgt. Ein Satz von SQL-Anweisungen transformiert den Streaminhalt und fügt ihn in Produktionstabellen ein:

Beispiel 3:

-- Create a staging table that stores raw JSON data
CREATE OR REPLACE TABLE data_staging (
  raw variant);

-- Create a stream on the staging table
CREATE OR REPLACE STREAM data_check ON TABLE data_staging;

-- Create 2 production tables to store transformed
-- JSON data in relational columns
CREATE OR REPLACE TABLE data_prod1 (
    id number(8),
    ts TIMESTAMP
    );

CREATE OR REPLACE TABLE data_prod2 (
    id number(8),
    color VARCHAR,
    num NUMBER
    );

-- Load JSON data into staging table
-- using COPY statement, Snowpipe,
-- or inserts

SELECT * FROM data_staging;

+--------------------------------------+
| RAW                                  |
|--------------------------------------|
| {                                    |
|   "id": 7077,                        |
|   "x1": "2018-08-14T20:57:01-07:00", |
|   "x2": [                            |
|     {                                |
|       "y1": "green",                 |
|       "y2": "35"                     |
|     }                                |
|   ]                                  |
| }                                    |
| {                                    |
|   "id": 7078,                        |
|   "x1": "2018-08-14T21:07:26-07:00", |
|   "x2": [                            |
|     {                                |
|       "y1": "cyan",                  |
|       "y2": "107"                    |
|     }                                |
|   ]                                  |
| }                                    |
+--------------------------------------+

--  Stream table shows inserted data
SELECT * FROM data_check;

+--------------------------------------+-----------------+-------------------+------------------------------------------+
| RAW                                  | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|--------------------------------------+-----------------+-------------------|------------------------------------------|
| {                                    | INSERT          | False             | 789012e01ef4j3k890123k35mnopqr567890124j |
|   "id": 7077,                        |                 |                   |                                          |
|   "x1": "2018-08-14T20:57:01-07:00", |                 |                   |                                          |
|   "x2": [                            |                 |                   |                                          |
|     {                                |                 |                   |                                          |
|       "y1": "green",                 |                 |                   |                                          |
|       "y2": "35"                     |                 |                   |                                          |
|     }                                |                 |                   |                                          |
|   ]                                  |                 |                   |                                          |
| }                                    |                 |                   |                                          |
| {                                    | INSERT          | False             | 765432u89tk3l6y456789012rst7vx678912456k |
|   "id": 7078,                        |                 |                   |                                          |
|   "x1": "2018-08-14T21:07:26-07:00", |                 |                   |                                          |
|   "x2": [                            |                 |                   |                                          |
|     {                                |                 |                   |                                          |
|       "y1": "cyan",                  |                 |                   |                                          |
|       "y2": "107"                    |                 |                   |                                          |
|     }                                |                 |                   |                                          |
|   ]                                  |                 |                   |                                          |
| }                                    |                 |                   |                                          |
+--------------------------------------+-----------------+-------------------+------------------------------------------+

-- Access and lock the stream
BEGIN;

-- Transform and copy JSON elements into relational columns
-- in the production tables
INSERT INTO data_prod1 (id, ts)
SELECT t.raw:id, to_timestamp (t.raw:x1)
FROM data_check t
WHERE METADATA$ACTION = 'INSERT';

INSERT INTO data_prod2 (id, color, num)
SELECT t.raw:id, f.value:y1, f.value:y2
FROM data_check t
, lateral flatten(input => raw:x2) f
WHERE METADATA$ACTION = 'INSERT';

-- Commit changes in the stream objects participating in the transaction
COMMIT;

SELECT * FROM data_prod1;

+------+-------------------------+
|   ID | TS                      |
|------+-------------------------|
| 7077 | 2018-08-14 20:57:01.000 |
| 7078 | 2018-08-14 21:07:26.000 |
+------+-------------------------+

SELECT * FROM data_prod2;

+------+-------+-----+
|   ID | COLOR | NUM |
|------+-------+-----|
| 7077 | green |  35 |
| 7078 | cyan  | 107 |
+------+-------+-----+

SELECT * FROM data_check;

+-----+-----------------+-------------------+
| RAW | METADATA$ACTION | METADATA$ISUPDATE |
|-----+-----------------+-------------------|
+-----+-----------------+-------------------+