Kategorien:

Datenpipeline DDL

CREATE STREAM

Erstellt einen neuen Stream im aktuellen/angegebenen Schema oder ersetzt einen bestehenden Stream. Ein Stream zeichnet an einer Tabelle vorgenommene Data Manipulation Language (DML)-Änderungen auf, einschließlich Einfügungen, Aktualisierungen und Löschvorgängen. Die Tabelle, für die Änderungen aufgezeichnet werden, wird als Quelltabelle bezeichnet.

Darüber hinaus unterstützt dieser Befehl die folgende Variante:

  • CREATE STREAM … CLONE (erstellt einen Klon eines bestehenden Streams)

Siehe auch:

ALTER STREAM, DROP STREAM, SHOW STREAMS

Unter diesem Thema:

Syntax

-- Table stream
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
  <name>
  [ COPY GRANTS ]
  ON TABLE <table_name>
  [ { AT | BEFORE } { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> } ]
  [ APPEND_ONLY = TRUE | FALSE ]
  [ COMMENT = '<string_literal>' ]

-- External table stream
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
  <name>
  [ COPY GRANTS ]
  ON EXTERNAL TABLE <external_table_name>
  [ { AT | BEFORE } { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> } ]
  [ INSERT_ONLY = TRUE ]
  [ COMMENT = '<string_literal>' ]

Syntaxvariante

CREATE STREAM … CLONE

Erstellt einen neuen Stream mit derselben Definition wie der Quellstream. Der Klon erbt den aktuellen Offset (d. h. die aktuelle Transaktionsversion der Tabelle) vom Quell-Stream.

CREATE [ OR REPLACE ] STREAM <name> CLONE <source_stream>
  [ COPY GRANTS ]
  [ ... ]

Weitere Informationen zum Klonen finden Sie unter CREATE <Objekt> … CLONE.

Erforderliche Parameter

Name

Zeichenfolge, die den Bezeichner (d. h. den Namen) für den Stream angibt. Der Wert muss für das Schema, in dem der Stream erstellt wird, eindeutig sein.

Darüber hinaus muss der Bezeichner mit einem Buchstaben beginnen und darf keine Leer- oder Sonderzeichen enthalten, es sei denn, die gesamte Bezeichnerzeichenfolge wird in doppelte Anführungszeichen gesetzt (z. B. "My object"). Bei Bezeichnern, die in doppelte Anführungszeichen eingeschlossen sind, ist auch die Groß- und Kleinschreibung zu beachten.

Weitere Details dazu finden Sie unter Anforderungen an Bezeichner.

Tabellenname

Zeichenfolge, die den Bezeichner (d. h. den Namen) für die Tabelle angibt, deren Änderungen vom Stream verfolgt werden (d. h. die Quelltabelle).

Zugriffssteuerung

Um einen Stream abzufragen, muss eine Rolle über die Berechtigung SELECT für die zugrunde liegende Tabelle verfügen.

Name_der_externen_Tabelle

Zeichenfolge, die den Bezeichner (d. h. den Namen) für die Tabelle angibt, deren Änderungen vom Stream verfolgt werden sollen (d. h. die externe Quelltabelle).

Zugriffssteuerung

Um einen Stream abzufragen, muss eine Rolle über die Berechtigung SELECT für die zugrunde liegende externe Tabelle verfügen.

Optionale Parameter

COPY GRANTS

Gibt an, dass die Zugriffsberechtigungen aus dem Original-Stream beibehalten werden, wenn mit einer der folgenden CREATE STREAM-Varianten ein neuer Stream erstellt wird:

  • CREATE OR REPLACE STREAM

  • CREATE STREAM … CLONE

Der Parameter kopiert alle Berechtigungen, mit Ausnahme von OWNERSHIP, aus dem bestehenden Stream in den neuen Stream. Standardmäßig ist die Rolle, die den Befehl CREATE STREAM ausführt, Eigentümer des neuen Streams.

Bemerkung

  • Wenn die CREATE STREAM-Anweisung auf mehr als einen Stream verweist (z. B. create or replace stream t1 clone t2;), gibt die COPY GRANTS-Klausel dem zu ersetzenden Stream den Vorrang.

  • Die SHOW GRANTS-Ausgabe für den Ersetzungsstream listet den Berechtigten für die kopierten Berechtigungen als Rolle auf, die die CREATE STREAM-Anweisung ausgeführt hat, und mit dem aktuellen Zeitstempel für die Ausführung der Anweisung.

  • Die Operation zum Kopieren von Berechtigungen erfolgt atomar im Befehl CREATE STREAM (d. h. innerhalb derselben Transaktion).

Bemerkung

Dieser Parameter wird derzeit nicht unterstützt.

AT | BEFORE TIMESTAMP => <Zeitstempel> | OFFSET => <time_difference> | STATEMENT => <ID>

Erstellt einen Stream für eine Tabelle zu einem bestimmten Zeitpunkt in der Vergangenheit (mittels Time Travel). Die AT | BEFORE-Klausel bestimmt den Zeitpunkt in der Vergangenheit, ab dem historische Daten für die Tabelle angefordert werden:

  • Das Schlüsselwort AT gibt an, dass die Anforderung alle Änderungen beinhaltet, die durch eine Anweisung oder Transaktion mit einem Zeitstempel gleich dem angegebenen Parameter vorgenommen werden.

  • Das Schlüsselwort BEFORE gibt an, dass sich die Anforderung auf einen Zeitpunkt unmittelbar vor dem angegebenen Parameter bezieht.

Bemerkung

Derzeit muss ein Stream für eine Tabelle erstellt werden, bevor Änderungsverfolgungsinformationen für die Tabelle aufgezeichnet werden. Wenn in dieser Vorschau zu dem in der AT | BEFORE-Klausel angegebenen Zeitpunkt in der Tabelle kein Stream vorhanden war, lieferte die CREATE STREAM-Anweisung einen Fehler. Ein Stream kann nur zu einem Zeitpunkt in der Vergangenheit erstellt werden, wenn vorher die Änderungsverfolgung aktiviert wurde.

APPEND_ONLY = TRUE | FALSE

Gibt an, ob es sich um einen Nur-Anfügen-Stream handelt. Nur-Anfügen-Streams verfolgen nur Zeileneinfügungen. Aktualisierungs- und Löschoperationen (einschließlich Tabellenkürzungen) werden nicht aufgezeichnet. Wenn beispielsweise 10 Zeilen in eine Tabelle eingefügt werden und dann 5 dieser Zeilen gelöscht werden, bevor der Offset für einen Nur-Anfügen-Stream erweitert wird, zeichnet der Stream 10 Zeilen auf.

Diese Art von Stream verbessert die Abfrageleistung gegenüber Standardstreams und ist sehr nützlich für das Extrahieren, Laden, Transformieren (ELT) und ähnliche Szenarien, die ausschließlich von Zeileneinfügungen abhängen.

Ein Standardstream verknüpft die gelöschten und eingefügten Zeilen im Änderungsset, um zu bestimmen, welche Zeilen gelöscht und welche aktualisiert wurden. Ein Nur-Anfügen-Stream gibt nur die angefügten Zeilen zurück und kann daher viel leistungsfähiger als ein Standardstream sein. Beispiel: Die Quelltabelle kann unmittelbar nach dem Verarbeiten der Zeilen eines Nur-Anfügen-Streams abgeschnitten werden, und das Löschen von Datensätzen produziert keinen Overhead, wenn der Stream das nächste Mal abgefragt oder verarbeitet wird.

Standard

FALSE

INSERT_ONLY = TRUE | FALSE

Gibt an, ob dies ein Nur-Einfügen-Stream ist. Bei Nur-Einfügen-Streams werden nur Zeileneinfügungen verfolgt. Es werden keine Löschoperationen aufgezeichnet, mit denen Zeilen aus einem eingefügten Set entfernt werden (d. h. NoOps). Wenn beispielsweise zwischen zwei beliebigen Offsets „File1“ aus dem Cloudspeicherort entfernt wird, auf den in der externen Tabelle verwiesen wird, und „File2“ hinzugefügt wird, gibt der Stream nur Datensätze für die Zeilen in „File2“ zurück. Im Gegensatz zur Verfolgung von CDC-Daten für Standardtabellen kann Snowflake nicht auf die historischen Datensätze von Dateien im Cloudspeicher zugreifen.

Bemerkung

Die Unterstützung von Nur-Einfügen-Tabellenstreams wird als Vorschau-Funktion bereitgestellt.

Standard

FALSE

COMMENT = 'Zeichenfolgenliteral'

Zeichenfolge (Literal), die einen Kommentar zur Tabelle enthält.

Standard: Kein Wert

Ausgabe

Die Ausgabe für einen Stream enthält dieselben Spalten wie die Quelltabelle sowie die folgenden zusätzlichen Spalten:

  • METADATA$ACTION: Gibt die Aktion an (INSERT oder DELETE).

  • METADATA$ISUPDATE: Gibt an, ob die aufgezeichnete Aktion (INSERT oder DELETE) Teil eines UPDATE ist, das auf die Zeilen in der Quelltabelle angewendet wird.

    Beachten Sie, dass Streams die Unterschiede zwischen zwei Offsets aufzeichnen. Wenn eine Zeile hinzugefügt und dann im aktuellen Offset aktualisiert wird, ist die Deltaänderung eine neue Zeile. Die METADATA$ISUPDATE-Zeile zeichnet einen FALSE-Wert auf.

  • METADATA$ROW_ID: Gibt die eindeutige und unveränderliche ID für die Zeile an, mit der Änderungen an bestimmten Zeilen im Laufe der Zeit verfolgt werden können.

Nutzungshinweise

  • Für das Erstellen eines Streams ist eine Rolle erforderlich, der die folgenden Berechtigungen zusammen mit den Berechtigungen USAGE für die Datenbank und das Schema explizit erteilt wurden:

    • Schema: CREATE STREAM

    • Quelltabelle: SELECT

  • Ein Stream kann mehrmals abgefragt werden, um mehrere Objekte in derselben Transaktion zu aktualisieren, und er gibt dieselben Daten zurück.

  • Die Stream-Position (d. h. der Offset) wird erhöht, wenn der Stream in einer DML -Anweisung verwendet wird. Die Position wird am Ende der Transaktion auf den Anfangszeitstempel der Transaktion aktualisiert. Der Stream beschreibt Änderungsdatensätze, die an der aktuellen Position des Streams beginnen und mit dem aktuellen Transaktionszeitstempel enden.

    Um sicherzustellen, dass verschiedene Anweisungen auf dieselben Änderungsdatensätze im Stream zugreifen, müssen Sie sie mit einer expliziten Transaktionsanweisung (BEGIN .. COMMIT) umgeben. Eine explizite Transaktion sperrt den Stream, sodass DML-Aktualisierungen der Quelltabelle erst nach dem Commit der Transaktion an den Stream gemeldet werden.

  • Bei Streams gibt es keine Fail-safe-Frist oder Time Travel-Aufbewahrungsfrist. Wenn ein Stream gelöscht wird, können die Metadaten in diesen Objekten nicht wiederhergestellt werden.

  • Wenn der erste Stream für eine Tabelle erstellt wird, wird ein Paar ausgeblendeter Spalten zur Tabelle hinzugefügt und beginnt mit dem Speichern von Metadaten zur Änderungsnachverfolgung. Die Spalten verbrauchen wenig Speicherplatz.

Beispiele

Erstellen eines Tabellenstreams

Erstellen Sie einen Stream für die Tabelle mytable:

CREATE STREAM mystream ON TABLE mytable;

Verwenden von Time Travel mit der Quelltabelle

Erstellen Sie einen Stream für die Tabelle mytable so, wie sie vor dem Datum und der Uhrzeit im angegebenen Zeitstempel existierte:

CREATE STREAM mystream ON TABLE mytable BEFORE (TIMESTAMP => TO_TIMESTAMP(40*365*86400));

Erstellen Sie einen Stream für die Tabelle mytable so, wie sie genau zum Datum und zur Uhrzeit des angegebenen Zeitstempels existierte:

CREATE STREAM mystream ON TABLE mytable AT (TIMESTAMP => TO_TIMESTAMP_TZ('02/02/2019 01:02:03', 'mm/dd/yyyy hh24:mi:ss'));

Erstellen Sie einen Stream für die Tabelle mytable so, wie sie vor 5 Minuten existiert hat:

CREATE STREAM mystream ON TABLE mytable AT(OFFSET => -60*5);

Erstellen Sie einen Stream für die Tabelle mytable mit Transaktionen bis zu, aber ohne Änderungen, die von der angegebenen Transaktion vorgenommen wurden:

CREATE STREAM mystream ON TABLE mytable BEFORE(STATEMENT => '8e5d0ca9-005e-44e6-b858-a8f5b37c5726');

Erstellen eines Nur-Einfügen-Streams auf einer externen Tabelle

Erstellen Sie einen Stream auf einer externen Tabelle, und fragen Sie die Datensätze zur Änderungsdatenerfassung des Datenstreams ab, mit denen die Datensätze verfolgt werden, die zu den Metadaten der externen Tabelle hinzugefügten wurden:

-- Create an external table that points to the MY_EXT_STAGE stage.
-- The external table is partitioned by the date (in YYYY/MM/DD format) in the file path.
CREATE OR REPLACE EXTERNAL TABLE my_ext_table (
  date_part date as to_date(substr(metadata$filename, 11, 10), 'YYYY/MM/DD'),
  ts timestamp AS (value:time::timestamp),
  user_id varchar AS (value:userId::varchar),
  color varchar AS (value:color::varchar)
) PARTITION BY (date_part)
  LOCATION=@my_ext_stage
  AUTO_REFRESH = false
  FILE_FORMAT=(TYPE=JSON);

-- Create a stream on the external table
CREATE OR REPLACE STREAM my_ext_table_stream ON EXTERNAL TABLE exttable_s3_part INSERT_ONLY = TRUE;

-- Execute SHOW streams
-- The MODE column indicates that the new stream is an INSERT_ONLY stream
SHOW STREAMS;
+-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------+
| created_on                    | name                   | database_name | schema_name | owner        | comment   | table_name                         | type  | stale | mode        |
|-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------|
| 2020-08-02 05:13:20.174 -0800 | MY_EXT_TABLE_STREAM    | MYDB          | PUBLIC      | MYROLE       |           | MYDB.PUBLIC.EXTTABLE_S3_PART       | DELTA | false | INSERT_ONLY |
+-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------+

-- Add a file named '2020/08/05/1408/log-08051409.json' to the stage using the appropriate tool for the cloud storage service.

-- Manually refresh the external table metadata.
ALTER EXTERNAL TABLE my_ext_table REFRESH;

-- Query the external table stream.
-- The stream indicates that the rows in the added JSON file were recorded in the external table metadata.
SELECT * FROM my_ext_table_stream;
+----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------+
| VALUE                                  | DATE_PART  | TS                      | USER_ID | COLOR | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | METADATA$FILENAME                           |
|----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------|
| {                                      | 2020-08-05 | 2020-08-05 15:57:01.000 | user25  | green | INSERT          | False             |                 | test/logs/2020/08/05/1408/log-08051409.json |
|   "color": "green",                    |            |                         |         |       |                 |                   |                 |                                             |
|   "time": "2020-08-05 15:57:01-07:00", |            |                         |         |       |                 |                   |                 |                                             |
|   "userId": "user25"                   |            |                         |         |       |                 |                   |                 |                                             |
| }                                      |            |                         |         |       |                 |                   |                 |                                             |
| {                                      | 2020-08-05 | 2020-08-05 15:58:02.000 | user56  | brown | INSERT          | False             |                 | test/logs/2020/08/05/1408/log-08051409.json |
|   "color": "brown",                    |            |                         |         |       |                 |                   |                 |                                             |
|   "time": "2020-08-05 15:58:02-07:00", |            |                         |         |       |                 |                   |                 |                                             |
|   "userId": "user56"                   |            |                         |         |       |                 |                   |                 |                                             |
| }                                      |            |                         |         |       |                 |                   |                 |                                             |
+----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------+