Änderungsnachverfolgung mit Tabellenstreams

Ein Streamobjekt erfasst an Tabellen vorgenommene Data Manipulation Language (DML)-Änderungen, einschließlich Einfügungen, Aktualisierungen und Löschvorgänge sowie Metadaten zu jeder Änderung, sodass mit den geänderten Daten Aktionen durchgeführt werden können. Dieser Vorgang wird als Change Data Capture (CDC) bezeichnet. Ein einzelner Tabellen-Stream verfolgt die Änderungen, die an Zeilen in einer Quelltabelle vorgenommen werden. Ein Tabellen-Stream (auch einfach als „Stream“ bezeichnet) stellt eine „Änderungstabelle“ zur Verfügung, die angibt, was sich auf Zeilenebene zwischen zwei Transaktionszeitpunkten in einer Tabelle geändert hat. Dies bietet die Möglichkeit, eine Sequenz von Änderungsdatensätzen auf transaktionale Weise abzufragen und zu verbrauchen.

Streams werden für lokale Standardtabellen, externe Tabellen und Verzeichnistabellen unterstützt.

Unter diesem Thema:

Offset-Speicher

Bei der Erstellung erstellt ein Tabellenstream logisch einen ersten Snapshot jeder Zeile der Quelltabelle, indem ein Zeitpunkt (Offset genannt) als aktuelle Transaktionsversion der Tabelle initialisiert wird. Das vom Stream verwendete Änderungsnachverfolgungssystem zeichnet dann Informationen über die DML-Änderungen auf, die nach Erstellung dieses Snapshots ausgeführt wurden. Änderungsdatensätze geben den Status einer Zeile vor und nach der Änderung an. Änderungsinformationen spiegeln die Spaltenstruktur der nachverfolgten Quelltabelle wider und enthalten zusätzliche Metadatenspalten, die jedes Änderungsereignis beschreiben.

Beachten Sie, dass ein Stream selbst keine Tabellendaten enthält. Ein Stream speichert einen Offset für die Quelltabelle und gibt CDC-Datensätze zurück, indem der Versionsverlauf für die Quelltabelle genutzt wird. Wenn der erste Stream für eine Tabelle erstellt wird, wird ein Paar ausgeblendeter Spalten zur Quelltabelle hinzugefügt und das System beginnt mit dem Speichern von Metadaten zur Änderungsnachverfolgung. Diese Spalten verbrauchen nur wenig Speicherplatz. Die CDC-Datensätze, die beim Abfragen eines Streams zurückgegeben werden, basieren auf einer Kombination aus dem im Stream gespeicherten Offset und den in der Tabelle gespeicherten Änderungsverfolgungsmetadaten.

Es kann nützlich sein, sich einen Stream als Lesezeichen vorzustellen, das einen Zeitpunkt auf den Seiten eines Buches (d. h. der Quelltabelle) angibt. Ein Lesezeichen kann weggeworfen werden, und andere Lesezeichen können an anderen Stellen in einem Buch eingefügt werden. Ebenso kann ein Stream gelöscht werden, und andere Streams können zum selben oder zu unterschiedlichen Zeitpunkten erstellt werden (entweder indem die Streams zu unterschiedlichen Zeiten nacheinander erstellt werden oder indem Time Travel verwendet wird), um die Änderungsdatensätze für eine Tabelle bei gleichen oder verschiedenen Offsets zu verarbeiten.

Ein Beispiel für einen Verbraucher von CDC-Datensätzen ist eine Datenpipeline, in der nur die Daten in Stagingtabellen, die sich seit der letzten Extraktion geändert haben, transformiert und in andere Tabellen kopiert werden.

Beachten Sie, dass Streams Änderungen in materialisierten Ansichten nicht nachverfolgen können.

Tabellenversionierung

Eine neue Tabellenversion wird immer dann erstellt, wenn für eine Transaktion auf der Tabelle, die eine oder mehrere DML-Anweisungen enthält, ein Commit ausgeführt wird. In der Transaktionshistorie einer Tabelle befindet sich zwischen zwei Tabellenversionen ein Stream-Offset. Die Abfrage eines Streams gibt die Änderungen zurück, die durch Transaktionen verursacht wurden, die nach dem Offset und zum oder vor dem aktuellen Zeitpunkt mit Commit bestätigt wurden.

Das folgende Beispiel zeigt eine Quelltabelle mit 10 mit Commit bestätigten Versionen in der Zeitachse. Der Offset von Stream s1 liegt derzeit zwischen den Tabellenversionen v3 und v4. Wenn der Stream abgefragt (oder verbraucht) wird, enthalten die zurückgegebenen Datensätze alle Transaktionen zwischen der Tabellenversion v4, der Version unmittelbar nach dem Stream-Offset in der Zeitachse der Tabelle, und einschließlich v10, der neuesten mit Commit bestätigten Tabellenversion in der Zeitachse.

Stream offset example

Ein Stream stellt den minimalen Satz von Änderungen von seinem aktuellen Offset bis zur aktuellen Version der Tabelle bereit.

Mehrere Abfragen können unabhängig voneinander dieselben Änderungsdaten aus einem Stream verbrauchen, ohne den Offset zu ändern. Ein Stream erhöht den Offset nur, wenn er in einer DML-Transaktion verwendet wird. Dieses Verhalten gilt sowohl für explizite als auch für autocommit-Transaktionen. (Beim Ausführen einer DML-Anweisung wird standardmäßig implizit eine Autocommit-Transaktion gestartet, und die Transaktion wird nach Abschluss der Anweisung mit COMMIT bestätigt. Dieses Verhalten wird mit dem Parameter AUTOCOMMIT gesteuert.) Das Abfragen eines Streams allein führt nicht zum Erhöhen des Offsets, auch nicht innerhalb einer expliziten Transaktion. Der Inhalt des Streams muss in einer DML-Anweisung verarbeitet werden.

Bemerkung

Um den Offset eines Datenstreams auf die aktuelle Tabellenversion vorzuverlegen, ohne die Änderungsdaten in einer DML-Operation zu verbrauchen, führen Sie eine der folgenden Aktionen durch:

  • Erstellen Sie den Stream neu (unter Verwendung der CREATE OR REPLACE STREAM-Syntax).

  • Insert the current change data into a temporary table. In the INSERT statement, query the stream but include a WHERE clause that filters out all of the change data (e.g. WHERE 0 = 1).

Wenn eine SQL-Anweisung einen Stream innerhalb einer expliziten Transaktion abfragt, erfolgt die Abfrage des Streams an der Streamspitze (d. h. dem Zeitstempel), als die Transaktion begann und nicht als die Anweisung ausgeführt wurde. Dieses Verhalten betrifft sowohl DML-Anweisungen als auch CREATE TABLE … AS SELECT (CTAS)-Anweisungen, die eine neue Tabelle mit Zeilen aus einem vorhandenen Stream füllen.

Eine DML-Anweisung, die ein „Select“ auf einem Stream ausführt, verbraucht alle Änderungsdaten im Stream, vorausgesetzt, die Transaktion wurde erfolgreich mit Commit bestätigt. Um sicherzustellen, dass verschiedene Anweisungen auf dieselben Änderungsdatensätze im Stream zugreifen, müssen Sie sie mit einer expliziten Transaktionsanweisung (BEGIN .. COMMIT) umgeben. Dies sperrt den Stream. DML-Aktualisierungen an der Quelltabelle in parallelen Transaktionen werden vom Änderungsnachverfolgungssystem verfolgt, aktualisieren den Stream jedoch erst, wenn für die explizite Transaktionsanweisung ein Commit ausgeführt wurde und die vorhandenen Änderungsdaten verbraucht wurden.

Wiederholbare Leseisolation

Streams unterstützen wiederholbare Leseisolation. Im wiederholbaren Lesemodus sehen verschiedene SQL-Anweisungen innerhalb einer Transaktion die gleichen Datensätze in einem Stream. Dies unterscheidet sich von dem für Tabellen unterstützten Read Committed-Modus, in dem Anweisungen alle Änderungen anzeigen, die von vorherigen Anweisungen vorgenommen wurden, die in derselben Transaktion ausgeführt wurden, obwohl für diese Änderungen noch kein Commit ausgeführt wurde.

Die Delta-Datensätze, die von Streams in einer Transaktion zurückgegeben werden, beinhalten den Bereich von der aktuellen Position des Streams bis zur Startzeit der Transaktion. Die Streamposition rückt zur Transaktionsstartzeit vor, wenn der Transaktions-Commit ausgeführt wird, andernfalls bleibt sie an der gleichen Position.

Betrachten Sie das folgende Beispiel:

Zeit

Transaktion 1

Transaktion 2

1

Starten Sie eine Transaktion.

2

Abfrage-Stream s1 für Tabelle t1. Der Stream gibt die Change Data Capture-Datensätze . zwischen der aktuellen Position bis zur Startzeit von Transaktion 1 zurück. Wenn der Stream in einer DML-Anweisung . verwendet wird, wird der Stream gesperrt, um Änderungen durch gleichzeitige Transaktionen zu verhindern.

3

Aktualisieren Sie Zeilen in Tabelle t1.

4

Abfrage-Stream s1. Gibt den gleichen Status des Streams zurück, den er zu Zeit 2 hatte.

5

Führen Sie den Transaktions-Commit aus. Wenn der Stream in DML-Anweisungen innerhalb der Transaktion verbraucht wurde, rückt die Stream-Position zur Transaktionsstartzeit vor.

6

Starten Sie eine Transaktion.

7

Abfrage-Stream s1. Die Ergebnisse enthalten Tabellenänderungen, für die Transaktion 1 ein Commit ausgeführt hat.

In Transaktion 1 werden allen Abfragen an Stream s1 dieselben Datensätze angezeigt. DML-Änderungen an Tabelle t1 werden nur dann im Stream aufgezeichnet, wenn für die Transaktion ein Commit ausgeführt wurde.

In Transaktion 2 werden Abfragen an den Stream die Änderungen angezeigt, die in der Tabelle in Transaktion 1 aufgezeichnet wurden. Beachten Sie aber Folgendes: Wenn Transaktion 2 begonnen hätte, bevor das Commit für Transaktion 1 ausgeführt worden wäre, hätten Abfragen an den Stream einen Snapshot des Streams von der Position des Streams bis zur Startzeit von Transaktion 2 zurückgegeben und keine der für Transaktion 1 committeten Änderungen wäre sichtbar.

Stream-Spalten

Ein Stream speichert einen Offset für die Quelltabelle und keine tatsächlichen Tabellenspalten oder Daten. Bei Abfragen auf den Stream erfolgen der Zugriff auf die historischen Daten und die Datenrückgabe in derselben Form wie bei Abfragen auf Quelltabellen (d. h. mit denselben Spaltennamen und derselben Reihenfolge), allerdings mit den folgenden zusätzlichen Spalten:

METADATA$ACTION

Gibt die erfasste DML-Operation (INSERT, DELETE) an.

METADATA$ISUPDATE

Gibt an, ob die Operation Teil einer UPDATE-Anweisung war. Aktualisierungen von Zeilen in der Quelltabelle werden als ein Paar von DELETE- und INSERT-Datensätzen im Stream dargestellt, wobei METADATA$ISUPDATE-Werte einer Metadatenspalte auf TRUE gesetzt sind.

Beachten Sie, dass Streams die Unterschiede zwischen zwei Offsets aufzeichnen. Wenn eine Zeile hinzugefügt und dann im aktuellen Offset aktualisiert wird, ist die Deltaänderung eine neue Zeile. Die Zeile METADATA$ISUPDATE zeichnet einen FALSE-Wert auf.

METADATA$ROW_ID

Gibt die eindeutige und unveränderliche Zeilen-ID an, mit der Änderungen an bestimmten Zeilen über die Zeit verfolgt werden können.

Typen von Streams

Die folgenden Streamtypen sind basierend auf den von jedem Stream aufgezeichneten Metadaten verfügbar:

Standard

Wird für (lokale) Standardtabellen und Verzeichnistabellen unterstützt. Ein Standard-Tabellenstream (d. h. Delta-Tabellenstream) verfolgt alle DML-Änderungen an der Quelltabelle wie Einfügungen, Aktualisierungen und Löschungen (einschließlich Tabellenkürzungen). Dieser Streamtyp führt eine Verknüpfung (Join) für eingefügte und gelöschte Zeilen im Änderungsset durch, um das Delta auf Zeilenebene bereitzustellen. Als Gesamteffekt wird beispielsweise eine Zeile, die zwischen zwei Transaktionszeitpunkten in einer Tabelle eingefügt und dann gelöscht wird, im Delta entfernt (d. h. sie wird nicht zurückgegeben, wenn der Stream abgefragt wird).

Nur-Anfügen

Wird nur für Standardtabellen unterstützt. Bei einem Nur-Anfügen-Tabellenstream werden nur Zeileneinfügungen protokolliert. Aktualisierungs- und Löschoperationen (einschließlich Tabellenkürzungen) werden nicht aufgezeichnet. Wenn beispielsweise 10 Zeilen in eine Tabelle eingefügt werden und dann 5 dieser Zeilen gelöscht werden, bevor der Offset für einen Nur-Anfügen-Stream erweitert wird, zeichnet der Stream 10 Zeilen auf.

Ein Nur-Anfügen-Stream gibt nur die angefügten Zeilen zurück und kann daher im Gegensatz zu einem Standardstream viel leistungsfähiger beim Extrahieren, Laden, Transformieren (ELT) und bei ähnlichen Szenarios sein, die ausschließlich von Zeileneinfügungen abhängig sind. Beispiel: Die Quelltabelle kann unmittelbar nach dem Verarbeiten der Zeilen eines Nur-Anfügen-Streams abgeschnitten werden, und das Löschen von Datensätzen produziert keinen Overhead, wenn der Stream das nächste Mal abgefragt oder verarbeitet wird.

Nur Einfügen

Wird nur bei externen Tabellen unterstützt. Bei Nur-Einfügen-Streams werden nur Zeileneinfügungen verfolgt. Es werden keine Löschoperationen aufgezeichnet, mit denen Zeilen aus einem eingefügten Set entfernt werden (d. h. NoOps). Wenn beispielsweise zwischen zwei beliebigen Offsets „File1“ aus dem Cloudspeicherort entfernt wird, auf den in der externen Tabelle verwiesen wird, und „File2“ hinzugefügt wird, gibt der Stream nur Datensätze für die Zeilen in „File2“ zurück. Im Gegensatz zur Verfolgung von CDC-Daten für Standardtabellen kann Snowflake nicht auf die historischen Datensätze von Dateien im Cloudspeicher zugreifen.

Überschriebene oder angehängte Dateien werden im Wesentlichen wie neue Dateien behandelt: Die alte Version der Datei wird aus dem Cloudspeicher entfernt, aber der Nur-Einfügen-Stream zeichnet die Löschoperation nicht auf. Die neue Version der Datei wird dem Cloudspeicher hinzugefügt, und der Nur-Einfügen-Stream zeichnet die Zeilen als Einfügeoperationen auf. Der Stream zeichnet die Differenz zwischen alter und neuer Dateiversion nicht auf. Beachten Sie, dass Anhänge möglicherweise keine automatische Aktualisierung der Tabelle auslösen, wie z. B. bei Verwendung von Azure AppendBlobs.

Datenfluss

Die folgende Abbildung zeigt, wie sich der Inhalt eines Standard-Tabellenstreams ändert, wenn Zeilen in der Quelltabelle aktualisiert werden. Immer, wenn eine DML-Anweisung die Streaminhalte verarbeitet hat, wechselt die Streamposition zur Nachverfolgung auf den nächsten Satz an DML-Änderungen auf der Tabelle (d. h. den Änderungen in einer Tabellenversion):

Streams Example

Datenaufbewahrungsfrist und Veraltung

Ein Stream veraltet, wenn sein Offset außerhalb der Datenaufbewahrungsfrist für seine Quelltabelle liegt. Wenn ein Stream veraltet ist, kann nicht mehr auf die historischen Daten der Quelltabelle zugegriffen werden, auch nicht auf ungenutzte Änderungsdatensätze. Um neue Änderungsdatensätze für die Tabelle verfolgen zu können, müssen Sie den Stream neu erstellen (mit CREATE STREAM). Verbrauchen Sie die Stream-Datensätze innerhalb einer Transaktion während der Aufbewahrungsfrist für die Tabelle, um zu verhindern, dass der Stream veraltet. Weitere Informationen zur Datenaufbewahrungsfrist finden Sie unter Verstehen und Verwenden von Time Travel.

Wenn die Datenaufbewahrungsfrist für eine Quelltabelle weniger als 14 Tage beträgt und kein Stream verbraucht wurde, verlängert Snowflake diese Frist vorübergehend, um ein Veralten zu verhindern. Die Frist wird unabhängig von der Snowflake Edition Ihres Kontos standardmäßig auf den Offset des Streams oder maximal 14 Tage verlängert. Die maximale Anzahl von Tagen, für die Snowflake die Datenaufbewahrungsfrist verlängern kann, wird durch den Parameterwert MAX_DATA_EXTENSION_TIME_IN_DAYS bestimmt. Wenn der Stream verarbeitet wurde, wird die verlängerte Datenaufbewahrungsfrist wieder auf die Standardfrist der Tabelle verkürzt.

Die folgende Tabelle zeigt Beispielwerte für DATA_RETENTION_TIME_IN_DAYS und MAX_DATA_EXTENSION_TIME_IN_DAYS und gibt an, wie häufig die Streaminhalte verarbeitet werden sollten, um ein Veralten zu vermeiden:

DATA_RETENTION_TIME_IN_DAYS

MAX_DATA_EXTENSION_TIME_IN_DAYS

Streams in X Tagen verarbeiten

14

0

14

1

14

14

0

90

90

Führen Sie den Befehl DESCRIBE STREAM oder SHOW STREAMS aus, um festzustellen, ob ein Stream veraltet ist. Wenn der Spaltenwert STALE in der Befehlsausgabe TRUE ist, kann der Stream veraltet sein. In der Praxis kann das Lesen aus dem Stream einige Zeit nach der erwarteten STALE_TIME erfolgen. Der Stream kann jedoch während dieses Zeitraums jederzeit veralten.

Wichtig

  • Wenn Sie eine Quelltabelle neu erstellen (mit der Syntax CREATE OR REPLACE TABLE), wird ihr Verlauf gelöscht, wodurch auch alle Streams auf der Tabelle veraltet sind.

  • Wenn eine Datenbank oder ein Schema geklont wird, die bzw. das eine Quelltabelle und einen Stream enthält, kann derzeit auf keinen der nicht verbrauchten Datensätze in den Streamklon zugegriffen werden. Dieses Verhalten ist konsistent mit Time Travel für Tabellen. Wenn eine Tabelle geklont wird, beginnen die historischen Daten für den Tabellenklon zu dem Zeitpunkt, an dem der Klon erstellt wurde.

  • Das Umbenennen einer Quelltabelle führt nicht dazu, dass ein Stream unterbrochen oder veraltet wird. Wenn eine Tabelle gelöscht und eine neue Tabelle mit demselben Namen erstellt wird, werden außerdem alle mit der ursprünglichen Tabelle verknüpften Datenströme nicht mit der neuen Tabelle verknüpft.

Streams mit mehreren Verbrauchern

Wir empfehlen, dass Benutzer für jeden Verbraucher von Änderungsdatensätzen für eine Tabelle einen separaten Datenstream erstellen. „Verbraucher“ bezieht sich dabei auf eine Aufgabe, ein Skript oder einen anderen Mechanismus, bei dem Tabellenänderungsdatensätze mithilfe einer DML-Transaktion genutzt („verbraucht“) werden. Wie bereits weiter oben unter diesem Thema erwähnt, erhöht eine Stream seinen Offset nur, wenn dieser in einer DML-Transaktion verwendet wird. Verschiedene Verbraucher von Änderungsdaten in einem einzigen Datenstream rufen unterschiedliche Deltas ab, es sei denn, es wird Time Travel verwendet. Wenn die Änderungsdaten, die vom letzten Offset eines Datenstreams erfasst wurden, mit einer DML-Transaktion verbraucht werden, erhöht der Datenstrom den Offset. Die Änderungsdaten stehen für den nächsten Verbraucher nicht mehr zur Verfügung. Um die gleichen Änderungsdaten einer Tabelle zu verbrauchen, erstellen Sie mehrere Streams für die Tabelle. Ein Stream speichert nur einen Offset für die Quelltabelle und nicht tatsächliche Tabellendaten. Daher können Sie eine beliebige Anzahl von Streams für eine Tabelle erstellen, ohne dass dies signifikante Kosten verursacht.

CHANGES-Klausel: Schreibgeschützte Alternative zu Streams

Als Alternative zu Streams unterstützt Snowflake das Abfragen von Änderungsverfolgungsmetadaten für Tabellen mithilfe der CHANGES-Klausel für SELECT-Anweisungen. Die CHANGES-Klausel ermöglicht das Abfragen von Änderungsverfolgungsmetadaten zwischen zwei Zeitpunkten, ohne dass ein Tabellenstream mit einem expliziten Transaktionsoffset erstellt werden muss. Wenn Sie die CHANGES-Klausel verwenden, wird der Offset nicht erhöht (d. h. es werden keine Datensätze verarbeitet). Die Änderungsverfolgungsmetadaten zwischen verschiedenen Transaktionsstart- und -endpunkten können von mehreren Abfragen abgerufen werden. Für diese Option muss ein Transaktionsstartpunkt für die Metadaten mithilfe einer AT | BEFORE-Klausel angegeben werden. Der Endpunkt für das Änderungsverfolgungsintervall kann mit der optionalen END-Klausel festgelegt werden.

Ein Stream speichert die aktuelle Transaktionsversion einer Tabelle und ist in den meisten Szenarien die geeignete Quelle für CDC-Datensätze. Für seltene Szenarien, in denen der Offset für beliebige Zeiträume verwaltet werden muss, steht Ihnen die CHANGES-Klausel zur Verfügung.

Bemerkung

Derzeit muss eine der beiden folgenden Bedingungen erfüllt sein, bevor Änderungsverfolgungsmetadaten für eine Tabelle aufgezeichnet werden:

  • Die Änderungsverfolgung ist für die Tabelle aktiviert (mit ALTER TABLE … CHANGE_TRACKING = TRUE).

  • Für die Tabelle wird ein Stream erstellt (mit CREATE STREAM).

Beide Optionen fügen der Tabelle ein Paar versteckter Spalten hinzu und beginnen mit dem Speichern von Metadaten zur Änderungsverfolgung. Die Werte in diesen ausgeblendeten CDC-Datenspalten liefern die Eingabe für die Metadaten-Spalten des Streams. Die Spalten verbrauchen wenig Speicherplatz.

Für den Zeitraum, bevor eine dieser Bedingungen erfüllt ist, sind keine Änderungsverfolgungsmetadaten für die Tabelle verfügbar.

Streams auf freigegebenen Tabellen

Beim Erstellen von Streams auf freigegebenen Tabellen können Datenverbraucher Änderungen verfolgen, die mittels Datenmanipulationssprache (DML) an diesen Tabellen vorgenommen wurden. Diese Funktionalität ähnelt dem Erstellen und Verwenden von Streams auf „lokalen“ Tabellen (d. h. Tabelle und Stream befinden sich in demselben Konto). Unter diesem Thema werden die Schritte beschrieben, mit denen Datenanbieter ihre freigegebenen Tabellen für das Erstellen von Streams konfigurieren und Verbraucher diese Streams erstellen können.

Eine Anleitung dazu finden Sie unter:

Datenanbieter

Verwenden von Freigaben

Datenverbraucher

Datenverbraucher

Abrechnung für Streams

Wie unter Datenaufbewahrungsfrist und Veraltung (unter diesem Thema) beschrieben, verlängert Snowflake vorübergehend die Datenaufbewahrungsfrist für die Quelltabelle, wenn ein Stream nicht regelmäßig verarbeitet wird. Wenn die Datenaufbewahrungsfrist der Tabelle weniger als 14 Tage beträgt, wird der Zeitraum unabhängig von der Snowflake-Edition Ihres Kontos automatisch auf den kleineren Transaktionsoffset des Streams oder auf 14 Tage (wenn die Datenaufbewahrungsfrist der Tabelle kleiner als 14 Tage ist) verlängert.

Beachten Sie, dass eine erweiterte Datenaufbewahrungsfrist zusätzlichen Speicherplatz erfordert, was sich in Ihren monatlichen Speicherkosten widerspiegelt.

Die mit einem Stream verbundenen Hauptkosten entstehen durch die Verarbeitungszeit, die ein virtuelles Warehouse für die Abfrage des Streams benötigt. Diese Gebühren sind auf der Rechnung als übliche Snowflake-Credits ausgewiesen.

Stream-DDL

Um das Erstellen und Verwalten von Streams zu unterstützen, bietet Snowflake den folgenden Satz von speziellen DDL-Befehlen:

Darüber hinaus können Anbieter den Zugriff auf die für ELT erforderlichen Datenbankobjekte mit der folgenden DDL-Standardzugriffssteuerung anzeigen, gewähren oder widerrufen:

Erforderliche Zugriffsrechte

Das Erstellen und Verwalten von Streams erfordert eine Rolle mit mindestens den folgenden Rollenberechtigungen:

Objekt

Berechtigung

Anmerkungen

Datenbank

USAGE

Schema

USAGE, CREATE STREAM

Tabelle

SELECT

Bemerkung

Wenn die Änderungsverfolgung für die Quelltabelle nicht aktiviert wurde (mit ALTER TABLE … SET CHANGE_TRACKING = TRUE), kann nur der Tabelleneigentümer (d. h. die Rolle mit der Berechtigung OWNERSHIP für die Tabelle) den ursprünglichen Stream für die Tabelle erstellen. Beim Erstellen des ersten Streams wird automatisch die Änderungsverfolgung auf der Tabelle aktiviert.

Das Abfragen eines Streams erfordert eine Rolle mit mindestens den folgenden Rollenberechtigungen:

Objekt

Berechtigung

Anmerkungen

Datenbank

USAGE

Schema

USAGE

Stream

SELECT

Tabelle

SELECT

Beispiele

Beispiel 1

Das folgende Beispiel zeigt, wie sich der Inhalt eines Streams ändert, wenn auf der Quelltabelle DML-Anweisungen ausgeführt werden:

-- Create a table to store the names and fees paid by members of a gym
CREATE OR REPLACE TABLE members (
  id number(8) NOT NULL,
  name varchar(255) default NULL,
  fee number(3) NULL
);

-- Create a stream to track changes to date in the MEMBERS table
CREATE OR REPLACE STREAM member_check ON TABLE members;

-- Create a table to store the dates when gym members joined
CREATE OR REPLACE TABLE signup (
  id number(8),
  dt DATE
  );

INSERT INTO members (id,name,fee)
VALUES
(1,'Joe',0),
(2,'Jane',0),
(3,'George',0),
(4,'Betty',0),
(5,'Sally',0);

INSERT INTO signup
VALUES
(1,'2018-01-01'),
(2,'2018-02-15'),
(3,'2018-05-01'),
(4,'2018-07-16'),
(5,'2018-08-21');

-- The stream records the inserted rows
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
|  1 | Joe    |   0 | INSERT          | False             | d200504bf3049a7d515214408d9a804fd03b46cd |
|  2 | Jane   |   0 | INSERT          | False             | d0a551cecbee0f9ad2b8a9e81bcc33b15a525a1e |
|  3 | George |   0 | INSERT          | False             | b98ad609fffdd6f00369485a896c52ca93b92b1f |
|  4 | Betty  |   0 | INSERT          | False             | e554e6e68293a51d8e69d68e9b6be991453cc901 |
|  5 | Sally  |   0 | INSERT          | False             | c94366cf8a4270cf299b049af68a04401c13976d |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

-- Apply a $90 fee to members who joined the gym after a free trial period ended:
MERGE INTO members m
  USING (
    SELECT id, dt
    FROM signup s
    WHERE DATEDIFF(day, '2018-08-15'::date, s.dt::DATE) < -30) s
    ON m.id = s.id
  WHEN MATCHED THEN UPDATE SET m.fee = 90;

SELECT * FROM members;

+----+--------+-----+
| ID | NAME   | FEE |
|----+--------+-----|
|  1 | Joe    |  90 |
|  2 | Jane   |  90 |
|  3 | George |  90 |
|  4 | Betty  |   0 |
|  5 | Sally  |   0 |
+----+--------+-----+

-- The stream records the updated FEE column as a set of inserts
-- rather than deletes and inserts because the stream contents
-- have not been consumed yet
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
|  1 | Joe    |  90 | INSERT          | False             | 957e84b34ef0f3d957470e02bddccb027810892c |
|  2 | Jane   |  90 | INSERT          | False             | b00168a4edb9fb399dd5cc015e5f78cbea158956 |
|  3 | George |  90 | INSERT          | False             | 75206259362a7c89126b7cb039371a39d821f76a |
|  4 | Betty  |   0 | INSERT          | False             | 9b225bc2612d5e57b775feea01dd04a32ce2ad18 |
|  5 | Sally  |   0 | INSERT          | False             | 5a68f6296c975980fbbc569ce01033c192168eca |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

-- Create a table to store member details in production
CREATE OR REPLACE TABLE members_prod (
  id number(8) NOT NULL,
  name varchar(255) default NULL,
  fee number(3) NULL
);

-- Insert the first batch of stream data into the production table
INSERT INTO members_prod(id,name,fee) SELECT id, name, fee FROM member_check WHERE METADATA$ACTION = 'INSERT';

-- The stream position is advanced
select * from member_check;

+----+------+-----+-----------------+-------------------+-----------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----+-----------------+-------------------+-----------------|
+----+------+-----+-----------------+-------------------+-----------------+

-- Access and lock the stream
BEGIN;

-- Increase the fee paid by paying members
UPDATE members SET fee = fee + 15 where fee > 0;

+------------------------+-------------------------------------+
| number of rows updated | number of multi-joined rows updated |
|------------------------+-------------------------------------|
|                      3 |                                   0 |
+------------------------+-------------------------------------+

-- These changes are not visible because the change interval of the stream object starts at the current offset and ends at the current
-- transactional time point, which is the beginning time of the transaction
SELECT * FROM member_check;

+----+------+-----+-----------------+-------------------+-----------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----+-----------------+-------------------+-----------------|
+----+------+-----+-----------------+-------------------+-----------------+

-- Commit changes
COMMIT;

-- The changes surface now because the stream object uses the current transactional time as the end point of the change interval that now
-- includes the changes in the source table
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
|  1 | Joe    | 105 | INSERT          | True              | 123a45b67cd0e8f012345g01abcdef012345678a |
|  2 | Jane   | 105 | INSERT          | True              | 456b45b67cd1e8f123456g01ghijkl123456779b |
|  3 | George | 105 | INSERT          | True              | 567890c89de2f9g765438j20jklmn0234567890d |
|  1 | Joe    |  90 | DELETE          | True              | 123a45b67cd0e8f012345g01abcdef012345678a |
|  2 | Jane   |  90 | DELETE          | True              | 456b45b67cd1e8f123456g01ghijkl123456779b |
|  3 | George |  90 | DELETE          | True              | 567890c89de2f9g765438j20jklmn0234567890d |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

Beispiel 2

Das folgende Beispiel zeigt die Unterschiede im Verhalten von Standardstreams (Delta) und Nur-Anfügen-Streams:

-- Create a source table.
create or replace table t(id int, name string);

-- Create a standard stream on the source table.
create or replace  stream delta_s on table t;

-- Create an append-only stream on the source table.
create or replace  stream append_only_s on table t append_only=true;

-- Insert 3 rows into the source table.
insert into t values (0, 'charlie brown');
insert into t values (1, 'lucy');
insert into t values (2, 'linus');

-- Delete 1 of the 3 rows.
delete from t where id = '0';

-- The standard stream removes the deleted row.
select * from delta_s order by id;

+----+-------+-----------------+-------------------+------------------------------------------+
| ID | NAME  | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+-------+-----------------+-------------------+------------------------------------------|
|  1 | lucy  | INSERT          | False             | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 |
|  2 | linus | INSERT          | False             | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+-------+-----------------+-------------------+------------------------------------------+

-- The append-only stream does not remove the deleted row.
select * from append_only_s order by id;

+----+---------------+-----------------+-------------------+------------------------------------------+
| ID | NAME          | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+---------------+-----------------+-------------------+------------------------------------------|
|  0 | charlie brown | INSERT          | False             | e83abf629af50ccf94d1e78c547bfd8079e68d00 |
|  1 | lucy          | INSERT          | False             | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 |
|  2 | linus         | INSERT          | False             | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+---------------+-----------------+-------------------+------------------------------------------+

-- Create a table to store the change data capture records in each of the streams.
create or replace  table t2(id int, name string, stream_type string default NULL);

-- Insert the records from the streams into the new table, advancing the offset of each stream.
insert into t2(id,name,stream_type) select id, name, 'delta stream' from delta_s;
insert into t2(id,name,stream_type) select id, name, 'append_only stream' from append_only_s;

-- Update a row in the source table.
update t set name = 'sally' where name = 'linus';

-- The standard stream records the update operation.
select * from delta_s order by id;

+----+-------+-----------------+-------------------+------------------------------------------+
| ID | NAME  | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+-------+-----------------+-------------------+------------------------------------------|
|  2 | sally | INSERT          | True              | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
|  2 | linus | DELETE          | True              | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+-------+-----------------+-------------------+------------------------------------------+

-- The append-only stream does not record the update operation.
select * from append_only_s order by id;

+----+------+-----------------+-------------------+-----------------+
| ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----------------+-------------------+-----------------|
+----+------+-----------------+-------------------+-----------------+

Das folgende Beispiel zeigt, wie Streams in ELT-Prozessen (Extrahieren, Laden, Transformieren) verwendet werden können. In diesem Beispiel werden neue Daten, die in eine Stagingtabelle eingefügt wurden, von einem Stream verfolgt. Ein Satz von SQL-Anweisungen transformiert den Streaminhalt und fügt ihn in Produktionstabellen ein:

Beispiel 3:

-- Create a staging table that stores raw JSON data
CREATE OR REPLACE TABLE data_staging (
  raw variant);

-- Create a stream on the staging table
CREATE OR REPLACE STREAM data_check ON TABLE data_staging;

-- Create 2 production tables to store transformed
-- JSON data in relational columns
CREATE OR REPLACE TABLE data_prod1 (
    id number(8),
    ts TIMESTAMP_TZ
    );

CREATE OR REPLACE TABLE data_prod2 (
    id number(8),
    color VARCHAR,
    num NUMBER
    );

-- Load JSON data into staging table
-- using COPY statement, Snowpipe,
-- or inserts

SELECT * FROM data_staging;

+--------------------------------------+
| RAW                                  |
|--------------------------------------|
| {                                    |
|   "id": 7077,                        |
|   "x1": "2018-08-14T20:57:01-07:00", |
|   "x2": [                            |
|     {                                |
|       "y1": "green",                 |
|       "y2": "35"                     |
|     }                                |
|   ]                                  |
| }                                    |
| {                                    |
|   "id": 7078,                        |
|   "x1": "2018-08-14T21:07:26-07:00", |
|   "x2": [                            |
|     {                                |
|       "y1": "cyan",                  |
|       "y2": "107"                    |
|     }                                |
|   ]                                  |
| }                                    |
+--------------------------------------+

--  Stream table shows inserted data
SELECT * FROM data_check;

+--------------------------------------+-----------------+-------------------+------------------------------------------+
| RAW                                  | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|--------------------------------------+-----------------+-------------------|------------------------------------------|
| {                                    | INSERT          | False             | 789012e01ef4j3k890123k35mnopqr567890124j |
|   "id": 7077,                        |                 |                   |                                          |
|   "x1": "2018-08-14T20:57:01-07:00", |                 |                   |                                          |
|   "x2": [                            |                 |                   |                                          |
|     {                                |                 |                   |                                          |
|       "y1": "green",                 |                 |                   |                                          |
|       "y2": "35"                     |                 |                   |                                          |
|     }                                |                 |                   |                                          |
|   ]                                  |                 |                   |                                          |
| }                                    |                 |                   |                                          |
| {                                    | INSERT          | False             | 765432u89tk3l6y456789012rst7vx678912456k |
|   "id": 7078,                        |                 |                   |                                          |
|   "x1": "2018-08-14T21:07:26-07:00", |                 |                   |                                          |
|   "x2": [                            |                 |                   |                                          |
|     {                                |                 |                   |                                          |
|       "y1": "cyan",                  |                 |                   |                                          |
|       "y2": "107"                    |                 |                   |                                          |
|     }                                |                 |                   |                                          |
|   ]                                  |                 |                   |                                          |
| }                                    |                 |                   |                                          |
+--------------------------------------+-----------------+-------------------+------------------------------------------+

-- Access and lock the stream
BEGIN;

-- Transform and copy JSON elements into relational columns
-- in the production tables
INSERT INTO data_prod1 (id, ts)
SELECT t.raw:id, to_timestamp_tz(t.raw:x1)
FROM data_check t
WHERE METADATA$ACTION = 'INSERT';

INSERT INTO data_prod2 (id, color, num)
SELECT t.raw:id, f.value:y1, f.value:y2
FROM data_check t
, lateral flatten(input => raw:x2) f
WHERE METADATA$ACTION = 'INSERT';

-- Commit changes in the stream objects participating in the transaction
COMMIT;

SELECT * FROM data_prod1;

+------+---------------------------+
|   ID | TS                        |
|------+---------------------------|
| 7077 | 2018-08-14 20:57:01 -0700 |
| 7078 | 2018-08-14 21:07:26 -0700 |
+------+---------------------------+

SELECT * FROM data_prod2;

+------+-------+-----+
|   ID | COLOR | NUM |
|------+-------+-----|
| 7077 | green |  35 |
| 7078 | cyan  | 107 |
+------+-------+-----+

SELECT * FROM data_check;

+-----+-----------------+-------------------+
| RAW | METADATA$ACTION | METADATA$ISUPDATE |
|-----+-----------------+-------------------|
+-----+-----------------+-------------------+