Suivi des modifications à l’aide de flux de table

Un objet de flux enregistre les modifications de langage de manipulation des données (DML) apportées aux tables, notamment les insertions, les mises à jour et les suppressions, ainsi que les métadonnées relatives à chaque modification, de sorte que des actions puissent être entreprises à l’aide des données modifiées. Ce processus est appelé « capture de données modifiées » (CDC). Un flux de table individuel suit les modifications apportées aux lignes d’une table source. Un flux de table (également appelé simplement « flux ») rend disponible une « table de modifications » de ce qui a changé, au niveau de la ligne, entre deux instants transactionnels d’une table. Cela permet d’interroger et de consommer une séquence d’enregistrements de modification de manière transactionnelle.

Dans ce chapitre :

Vue d’ensemble des flux de table

Une fois créé, un flux de table prend logiquement un instantané initial de chaque ligne de la table source en initialisant un point dans le temps (décalage) en tant que version transactionnelle actuelle de la table. Le système de suivi des modifications utilisé par le flux enregistre ensuite les informations relatives aux modifications DML (insertion, mise à jour, suppression) validées après la prise de cette capture instantanée. Les enregistrements de modifications fournissent l’état d’une ligne avant et après la modification. Les informations de modification reflètent la structure de colonne de la table source suivie et incluent des colonnes de métadonnées supplémentaires décrivant chaque événement de modification.

Notez que le flux lui-même ne contient aucune donnée de table. Un flux stocke uniquement le décalage de la table source et renvoie des enregistrements CDC en exploitant l’historique de gestion des versions de la table source. Lorsque le premier flux d’une table est créé, une paire de colonnes masquées est ajoutée à la table source et commence à stocker les métadonnées de suivi des modifications. Ces colonnes consomment une petite quantité de stockage. Les enregistrements CDC renvoyés lors de l’interrogation d’un flux reposent sur une combinaison du décalage stocké dans le flux et des métadonnées de suivi des modifications stockées dans la table.

Il peut être utile de considérer un flux comme un signet, indiquant un moment précis dans les pages d’un livre (c’est-à-dire le tableau source). Un signet peut être jeté et d’autres signets insérés à différents endroits dans un livre. De même, un flux peut être supprimé et d’autres flux créés au même moment ou à des moments différents (soit en créant les flux consécutivement à des moments différents, soit en utilisant Time Travel) pour consommer les enregistrements de modification d’une table à des décalages identiques ou différents.

Un exemple de consommateur d’enregistrements CDC est un pipeline de données dans lequel seules les données des tables de transfert qui ont été modifiées depuis la dernière extraction sont transformées et copiées dans d’autres tables.

Notez que, pour le moment, les flux ne peuvent pas suivre les modifications dans les vues matérialisées.

Table et gestion de versions

Un flux conserve un point de temps dans la chronologie transactionnelle des versions de la table source, appelé décalage, qui commence au point transactionnel de la dernière utilisation du contenu du flux à l’aide d’une instruction DML. Le flux peut fournir l’ensemble des modifications du décalage actuel à l’heure de la transaction actuelle de la table source (c’est-à-dire la version actuelle de la table). Le flux ne conserve que le delta des modifications ; si plusieurs instructions DML modifient une ligne, le flux ne contient que la dernière action effectuée sur cette ligne.

Plusieurs requêtes peuvent consommer indépendamment les mêmes données de modification d’un flux sans modifier le décalage. Un flux avance le décalage uniquement lorsqu’il est utilisé dans une transaction DML, y compris les transactions autocommit (c’est-à-dire exécutées sans démarrer explicitement une transaction. Par défaut, une transaction est automatiquement validée en cas de succès ou annulée en cas d’échec à la fin de l’instruction. Ce comportement est contrôlé avec le paramètre AUTOCOMMIT.)

Lorsqu’une instruction SQL interroge un flux dans une transaction explicite, le flux est interrogé au point d’avance du flux (c’est-à-dire l’horodatage) lorsque la transaction a commencé plutôt que lorsque l’instruction a été exécutée. Ce comportement concerne à la fois les instructions DML et CREATE TABLE … AS SELECT (CTAS) qui remplissent une nouvelle table avec des lignes d’un flux existant.

Le décalage actuel pour un flux peut être déterminé en interrogeant la fonction SYSTEM$STREAM_GET_TABLE_TIMESTAMP .

Une instruction DML qui sélectionne un flux utilise toutes les données de modification du flux tant que la transaction est validée. Pour vous assurer que plusieurs instructions ont accès aux mêmes enregistrements de modification dans le flux, entourez-les d’une instruction de transaction explicite (BEGIN .. COMMIT). Cela verrouille le flux. Les mises à jour DML de la table source dans les transactions parallèles sont suivies par le système de suivi des modifications mais ne mettent pas à jour le flux tant que l’instruction de transaction explicite n’est pas validée et que les données de modification existantes ne sont pas utilisées.

Isolement de lecture répétable

Les flux prennent en charge l’isolation de lecture répétable. En mode de lecture répétable, plusieurs instructions SQL d’une transaction voient le même jeu d’enregistrements dans un flux. Cela diffère du mode de lecture validée pris en charge pour les tables, dans lequel les instructions voient toutes les modifications apportées par les instructions précédentes exécutées dans la même transaction, même si ces modifications ne sont pas encore validées.

Les enregistrements delta renvoyés par les flux d’une transaction correspondent à la plage allant de la position actuelle du flux jusqu’à l’heure de début de la transaction. La position du flux avance à l’heure de début de la transaction si la transaction est validée ; sinon, elle reste à la même position.

Prenons l’exemple suivant :

Durée

Transaction 1

Transaction 2

1

Commencez une transaction.

2

Interrogez le flux s1 sur la table t1. Le flux renvoie les enregistrements de capture de données modifiées . entre la position actuelle et l’heure de début de la transaction 1. Si le flux est utilisé dans une instruction DML ., il est alors verrouillé pour éviter les modifications par des transactions simultanées.

3

Mettez à jour des lignes dans une table t1.

4

Interrogez le flux s1. Renvoie le même état de flux que lorsqu’il a été utilisé à Heure 2.

5

Validez la transaction. Si le flux a été utilisé dans des instructions DML dans la transaction, la position du flux passe à l’heure de début de la transaction.

6

Commencez une transaction.

7

Interrogez le flux s1. Les résultats incluent les modifications de table validées par la transaction 1.

Dans la transaction 1, toutes les requêtes pour diffuser s1 affichent le même jeu d’enregistrements. Les modifications DML apportées à la table t1 sont enregistrées dans le flux uniquement lorsque la transaction est validée.

Dans la transaction 2, les requêtes sur le flux affichent les modifications enregistrées dans la table dans la Transaction 1. Notez que si la Transaction 2 avait commencé avant que la Transaction 1 soit validée, les requêtes adressées au flux auraient renvoyé un instantané du flux de la position du flux au début de la Transaction 2 et n’afficheraient aucune modification validée par la Transaction 1.

Colonnes de flux

Un flux stocke des données sous la même forme que la table source (c’est-à-dire les mêmes noms de colonnes et le même ordre) avec les colonnes supplémentaires suivantes :

METADATA$ACTION

Indique l’opération DML (INSERT, DELETE) enregistrée.

METADATA$ISUPDATE

Indique si l’opération faisait partie d’une instruction UPDATE. Les mises à jour des lignes de la table source sont représentées par une paire d’enregistrements DELETE et INSERT dans le flux avec une colonne de métadonnées METADATA$ISUPDATE définie sur TRUE.

Notez que les flux enregistrent les différences entre deux décalages. Si une ligne est ajoutée puis mise à jour dans le décalage actuel, la modification delta est une nouvelle ligne. La ligne METADATA$ISUPDATE enregistre une valeur FALSE.

METADATA$ROW_ID

Spécifie l” ID unique et immuable pour la ligne, qui peut être utilisé pour suivre les modifications apportées à des lignes spécifiques au fil du temps.

Types de flux

Les types de flux suivants sont disponibles en fonction des métadonnées enregistrées par chacun :

Standard

Un flux de table standard (c’est-à-dire delta) suit toutes les modifications DML de la table source, y compris les insertions, les mises à jour et les suppressions (y compris les troncatures de table). Ce type de flux effectue une jointure sur les lignes insérées et supprimées dans le jeu de modifications pour fournir le delta de niveau ligne. En tant qu’effet net, par exemple, une ligne qui est insérée puis supprimée entre deux points de temps transactionnels dans une table est supprimée dans le delta (c’est-à-dire qu’elle n’est pas renvoyée lorsque le flux est interrogé).

Ajouter uniquement

Un flux de table d’ajout uniquement suit les insertions de ligne uniquement. Les opérations de mise à jour et de suppression (y compris les troncations de table) ne sont pas enregistrées. Par exemple, si 10 lignes sont insérées dans une table et que 5 de ces lignes sont supprimées avant que le décalage pour un flux d’ajout uniquement soit avancé, le flux enregistre 10 lignes.

Un flux d’ajout uniquement renvoie les lignes ajoutées uniquement et peut donc être beaucoup plus performant qu’un flux standard pour l’extraction, le chargement et la transformation (ELT) et des scénarios similaires qui dépendent exclusivement des insertions de lignes. Par exemple, la table source peut être tronquée immédiatement après la consommation des lignes d’un flux d’ajout uniquement, et les suppressions d’enregistrement ne contribuent pas à la surcharge la prochaine fois que le flux est interrogé ou consommé.

Insertion uniquement

Pris en charge sur les tables externes uniquement. Un flux à insertion uniquement suit uniquement les insertions de lignes. Il n’enregistre pas les opérations de suppression qui suppriment des lignes d’un ensemble inséré (c’est-à-dire sans opération). Par exemple, entre deux décalages, si le fichier1 est supprimé de l’emplacement de stockage Cloud référencé par la table externe et que le fichier2 est ajouté, le flux renvoie les enregistrements pour les lignes du fichier2 uniquement. Contrairement au suivi des données CDC pour les tables standard, Snowflake ne peut pas accéder aux enregistrements historiques des fichiers stockés dans le Cloud.

Note

La prise en charge des flux de table à insertion uniquement est fournie sous la forme d’une fonctionnalité en avant-première.

Flux de données

Le diagramme suivant montre comment le contenu d’un flux de table standard change lorsque les lignes de la table source sont mises à jour. Chaque fois qu’une instruction DML consomme le contenu du flux, la position du flux avance pour suivre le prochain ensemble de modifications de la table DML (c’est-à-dire les modifications apportées à une version de table) :

Streams Example

Période de conservation des données et obsolescence

Un flux devient obsolète lorsque son décalage est en dehors de la période de conservation des données pour sa table source. Lorsqu’un flux devient obsolète, les données historiques de la table source ne sont plus accessibles, y compris les enregistrements de modifications non utilisés. Pour suivre les nouveaux enregistrements de modification d’une table, recréez le flux (à l’aide de CREATE STREAM). Pour empêcher un flux de devenir périmé, consommez les enregistrements de flux dans une transaction pendant la période de conservation de la table. Pour plus d’informations sur la période de conservation des données, voir Comprendre et utiliser la fonction « Time Travel ».

Si la période de conservation des données pour une table source est inférieure à 14 jours et qu’un flux n’a pas été consommé, Snowflake prolonge temporairement cette période pour éviter qu’elle ne devienne obsolète. La période est étendue au décalage du flux, jusqu’à un maximum de 14 jours, quelle que soit l” édition Snowflake de votre compte. Lorsque le flux est consommé, la période de conservation étendue des données est réduite à la période par défaut de la table.

Pour déterminer si un flux est devenu obsolète, exécutez la commande DESCRIBE STREAM ou SHOW STREAMS. Dans la sortie de la commande, lorsque la valeur de la colonne STALE pour le flux est TRUE, le flux est périmé.

Note

Actuellement, lorsqu’une base de données ou un schéma contenant une table source et un flux est cloné(e), tous les enregistrements non consommés dans le flux (dans le clone) sont inaccessibles. Ce comportement est cohérent avec Time Travel pour les tables. Si une table est clonée, les données historiques pour le clone de table commencent à l’heure/le moment où le clone a été créé.

Consommation de flux à l’aide de tâches

Plusieurs tâches qui consomment des données de modification à partir d’un flux de table unique récupèrent différents deltas. Lorsqu’une tâche consomme les données de modification dans un flux à l’aide d’une instruction DML, le flux avance le décalage. Les données de modification ne sont plus disponibles pour la prochaine tâche à consommer. Actuellement, nous recommandons qu’une seule tâche consomme les données de modification d’un flux. Plusieurs flux peuvent être créés pour la même table et consommés par différentes tâches.

Clause CHANGES : alternative en lecture seule aux flux

Comme alternative aux flux, Snowflake prend en charge l’interrogation des métadonnées de suivi des modifications pour les tables à l’aide de la clause CHANGES pour les instructions SELECT. La clause CHANGES permet d’interroger les métadonnées de suivi des modifications entre deux points dans le temps sans avoir à créer un flux de table avec un décalage transactionnel explicite. L’utilisation de la clause CHANGES ne fait pas avancer le décalage (c’est-à-dire consommer des enregistrements). Plusieurs requêtes peuvent récupérer les métadonnées de suivi des modifications entre différents points de départ et points de terminaison transactionnels. Cette option nécessite de spécifier un point de départ transactionnel pour les métadonnées à l’aide d’une clause AT | BEFORE ; le point de terminaison de l’intervalle de suivi des modifications peut être défini à l’aide de la clause END facultative.

Un flux stocke la version transactionnelle actuelle d’une table et est la source appropriée d’enregistrements CDC dans la plupart des scénarios. Pour les scénarios peu fréquents qui nécessitent de gérer le décalage pendant des périodes arbitraires, la clause CHANGES est disponible pour votre usage.

Note

Actuellement, l’un des éléments suivants doit correspondre avant que les métadonnées de suivi des modifications soient enregistrées pour une table :

  • Le suivi des modifications est activé sur la table (en utilisant ALTER TABLE … CHANGE_TRACKING = TRUE).

  • Un flux est créé pour la table (à l’aide de CREATE STREAM).

L’une ou l’autre option ajoute une paire de colonnes masquées à la table et commence à stocker les métadonnées de suivi des modifications. Les colonnes consomment une petite quantité de stockage.

Aucune métadonnée de suivi des modifications du tableau n’est disponible pendant la période précédant la satisfaction de l’une de ces conditions.

Flux sur les tables partagées

La création de flux sur des tables partagées permet aux consommateurs de données de suivre les modifications apportées au langage de manipulation de données (DML) dans ces tables. Cette fonctionnalité est similaire à la création et à l’utilisation de flux sur des tables « locales » (c’est-à-dire dans le même compte que le flux). Cette rubrique décrit les étapes permettant aux fournisseurs de données de configurer des tables partagées pour la création de flux et aux consommateurs de créer les flux.

Pour obtenir des instructions, voir .

Facturation des flux

Comme décrit dans Période de conservation des données et obsolescence (dans ce chapitre), lorsqu’un flux n’est pas consommé régulièrement, Snowflake prolonge temporairement la période de conservation des données pour la table source. Si la période de conservation des données de la table est inférieure à 14 jours, alors en arrière-plan, elle est étendue à la valeur la plus petite liée au décalage transactionnel du flux ou à 14 jours (si la période de conservation des données pour la table est inférieure à 14 jours), quelle que soit l” édition Snowflake de votre compte.

L’étendue de la période de conservation des données nécessite un stockage supplémentaire qui se reflétera dans vos frais de stockage mensuels.

Le coût principal associé à un flux est le temps de traitement utilisé par un entrepôt virtuel pour interroger le flux. Ces frais apparaissent sur votre facture en tant que crédits Snowflake familiers.

DDL de flux

Pour faciliter la création et la gestion des flux, Snowflake fournit l’ensemble suivant de commandes spéciales DDL :

De plus, les fournisseurs peuvent afficher, accorder ou révoquer l’accès aux objets de base de données nécessaires pour ELT en utilisant le DDL de contrôle d’accès standard suivant :

Privilèges d’accès requis

La création et la gestion de flux requièrent un rôle avec au minimum les autorisations de rôle suivantes :

Objet

Privilège

Remarques

Base de données

USAGE

Schéma

USAGE, CREATE STREAM

Table

SELECT

L’interrogation d’un flux nécessite un rôle avec au minimum les autorisations de rôle suivantes :

Objet

Privilège

Remarques

Base de données

USAGE

Schéma

USAGE

Flux

SELECT

Table

SELECT

Exemples

Exemple 1

L’exemple suivant montre comment le contenu d’un flux change quand des instructions DML s’exécutent sur la table source :

-- Create a table to store the names and fees paid by members of a gym
CREATE OR REPLACE TABLE members (
  id number(8) NOT NULL,
  name varchar(255) default NULL,
  fee number(3) NULL
);

-- Create a stream to track changes to date in the MEMBERS table
CREATE OR REPLACE STREAM member_check ON TABLE members;

-- Create a table to store the dates when gym members joined
CREATE OR REPLACE TABLE signup (
  id number(8),
  dt DATE
  );

INSERT INTO members (id,name,fee)
VALUES
(1,'Joe',0),
(2,'Jane',0),
(3,'George',0),
(4,'Betty',0),
(5,'Sally',0);

INSERT INTO signup
VALUES
(1,'2018-01-01'),
(2,'2018-02-15'),
(3,'2018-05-01'),
(4,'2018-07-16'),
(5,'2018-08-21');

-- The stream records the inserted rows
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
|  1 | Joe    |   0 | INSERT          | False             | d200504bf3049a7d515214408d9a804fd03b46cd |
|  2 | Jane   |   0 | INSERT          | False             | d0a551cecbee0f9ad2b8a9e81bcc33b15a525a1e |
|  3 | George |   0 | INSERT          | False             | b98ad609fffdd6f00369485a896c52ca93b92b1f |
|  4 | Betty  |   0 | INSERT          | False             | e554e6e68293a51d8e69d68e9b6be991453cc901 |
|  5 | Sally  |   0 | INSERT          | False             | c94366cf8a4270cf299b049af68a04401c13976d |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

-- Apply a $90 fee to members who joined the gym after a free trial period ended:
MERGE INTO members m
  USING (
    SELECT id, dt
    FROM signup s
    WHERE DATEDIFF(day, '2018-08-15'::date, s.dt::DATE) < -30) s
    ON m.id = s.id
  WHEN MATCHED THEN UPDATE SET m.fee = 90;

SELECT * FROM members;

+----+--------+-----+
| ID | NAME   | FEE |
|----+--------+-----|
|  1 | Joe    |  90 |
|  2 | Jane   |  90 |
|  3 | George |  90 |
|  4 | Betty  |   0 |
|  5 | Sally  |   0 |
+----+--------+-----+

-- The stream records the updated FEE column as a set of inserts
-- rather than deletes and inserts because the stream contents
-- have not been consumed yet
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
|  1 | Joe    |  90 | INSERT          | False             | 957e84b34ef0f3d957470e02bddccb027810892c |
|  2 | Jane   |  90 | INSERT          | False             | b00168a4edb9fb399dd5cc015e5f78cbea158956 |
|  3 | George |  90 | INSERT          | False             | 75206259362a7c89126b7cb039371a39d821f76a |
|  4 | Betty  |   0 | INSERT          | False             | 9b225bc2612d5e57b775feea01dd04a32ce2ad18 |
|  5 | Sally  |   0 | INSERT          | False             | 5a68f6296c975980fbbc569ce01033c192168eca |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

-- Create a table to store member details in production
CREATE OR REPLACE TABLE members_prod (
  id number(8) NOT NULL,
  name varchar(255) default NULL,
  fee number(3) NULL
);

-- Insert the first batch of stream data into the production table
INSERT INTO members_prod(id,name,fee) SELECT id, name, fee FROM member_check WHERE METADATA$ACTION = 'INSERT';

-- The stream position is advanced
select * from member_check;

+----+------+-----+-----------------+-------------------+-----------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----+-----------------+-------------------+-----------------|
+----+------+-----+-----------------+-------------------+-----------------+

-- Access and lock the stream
BEGIN;

-- Increase the fee paid by paying members
UPDATE members SET fee = fee + 15 where fee > 0;

+------------------------+-------------------------------------+
| number of rows updated | number of multi-joined rows updated |
|------------------------+-------------------------------------|
|                      3 |                                   0 |
+------------------------+-------------------------------------+

-- These changes are not visible because the change interval of the stream object starts at the current offset and ends at the current transactional time point, which is the beginning time of the transaction
SELECT * FROM member_check;

+----+------+-----+-----------------+-------------------+-----------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----+-----------------+-------------------+-----------------|
+----+------+-----+-----------------+-------------------+-----------------+

-- Commit changes
COMMIT;

-- The changes surface now because the stream object uses the current transactional time as the end point of the change interval that now includes the changes in the source table
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
|  1 | Joe    | 105 | INSERT          | True              | 123a45b67cd0e8f012345g01abcdef012345678a |
|  2 | Jane   | 105 | INSERT          | True              | 456b45b67cd1e8f123456g01ghijkl123456779b |
|  3 | George | 105 | INSERT          | True              | 567890c89de2f9g765438j20jklmn0234567890d |
|  1 | Joe    |  90 | DELETE          | True              | 123a45b67cd0e8f012345g01abcdef012345678a |
|  2 | Jane   |  90 | DELETE          | True              | 456b45b67cd1e8f123456g01ghijkl123456779b |
|  3 | George |  90 | DELETE          | True              | 567890c89de2f9g765438j20jklmn0234567890d |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

Exemple 2

L’exemple suivant montre les différences de comportement entre les flux standard (delta) et les flux d’ajout uniquement :

-- Create a source table.
create or replace table t(id int, name string);

-- Create a standard stream on the source table.
create or replace  stream delta_s on table t;

-- Create an append-only stream on the source table.
create or replace  stream append_only_s on table t append_only=true;

-- Insert 3 rows into the source table.
insert into t values (0, 'charlie brown');
insert into t values (1, 'lucy');
insert into t values (2, 'linus');

-- Delete 1 of the 3 rows.
delete from t where id = '0';

-- The standard stream removes the deleted row.
select * from delta_s order by id;

+----+-------+-----------------+-------------------+------------------------------------------+
| ID | NAME  | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+-------+-----------------+-------------------+------------------------------------------|
|  1 | lucy  | INSERT          | False             | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 |
|  2 | linus | INSERT          | False             | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+-------+-----------------+-------------------+------------------------------------------+

-- The append-only stream does not remove the deleted row.
select * from append_only_s order by id;

+----+---------------+-----------------+-------------------+------------------------------------------+
| ID | NAME          | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+---------------+-----------------+-------------------+------------------------------------------|
|  0 | charlie brown | INSERT          | False             | e83abf629af50ccf94d1e78c547bfd8079e68d00 |
|  1 | lucy          | INSERT          | False             | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 |
|  2 | linus         | INSERT          | False             | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+---------------+-----------------+-------------------+------------------------------------------+

-- Create a table to store the change data capture records in each of the 3 streams.
create or replace  table t2(id int, name string, stream_type string default NULL);

-- Insert the records from the streams into the new table, advancing the offset of each stream.
insert into t2(id,name,stream_type) select id, name, 'delta stream' from delta_s;
insert into t2(id,name,stream_type) select id, name, 'append_only stream' from append_only_s;

-- Update a row in the source table.
update t set name = 'sally' where name = 'linus';

-- The standard stream records the update operation.
select * from delta_s order by id;

+----+-------+-----------------+-------------------+------------------------------------------+
| ID | NAME  | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+-------+-----------------+-------------------+------------------------------------------|
|  2 | sally | INSERT          | True              | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
|  2 | linus | DELETE          | True              | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+-------+-----------------+-------------------+------------------------------------------+

-- The append-only stream does not record the update operation.
select * from append_only_s order by id;

+----+------+-----------------+-------------------+-----------------+
| ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----------------+-------------------+-----------------|
+----+------+-----------------+-------------------+-----------------+

L’exemple suivant montre comment utiliser les flux dans les processus ELT (extraire, charger, transformer). Dans cet exemple, les nouvelles données insérées dans une table de transfert sont suivies par un flux. Un ensemble d’instructions SQL transforme et insère le contenu du flux dans un ensemble de tables de production :

Exemple 3

-- Create a staging table that stores raw JSON data
CREATE OR REPLACE TABLE data_staging (
  raw variant);

-- Create a stream on the staging table
CREATE OR REPLACE STREAM data_check ON TABLE data_staging;

-- Create 2 production tables to store transformed
-- JSON data in relational columns
CREATE OR REPLACE TABLE data_prod1 (
    id number(8),
    ts TIMESTAMP
    );

CREATE OR REPLACE TABLE data_prod2 (
    id number(8),
    color VARCHAR,
    num NUMBER
    );

-- Load JSON data into staging table
-- using COPY statement, Snowpipe,
-- or inserts

SELECT * FROM data_staging;

+--------------------------------------+
| RAW                                  |
|--------------------------------------|
| {                                    |
|   "id": 7077,                        |
|   "x1": "2018-08-14T20:57:01-07:00", |
|   "x2": [                            |
|     {                                |
|       "y1": "green",                 |
|       "y2": "35"                     |
|     }                                |
|   ]                                  |
| }                                    |
| {                                    |
|   "id": 7078,                        |
|   "x1": "2018-08-14T21:07:26-07:00", |
|   "x2": [                            |
|     {                                |
|       "y1": "cyan",                  |
|       "y2": "107"                    |
|     }                                |
|   ]                                  |
| }                                    |
+--------------------------------------+

--  Stream table shows inserted data
SELECT * FROM data_check;

+--------------------------------------+-----------------+-------------------+------------------------------------------+
| RAW                                  | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|--------------------------------------+-----------------+-------------------|------------------------------------------|
| {                                    | INSERT          | False             | 789012e01ef4j3k890123k35mnopqr567890124j |
|   "id": 7077,                        |                 |                   |                                          |
|   "x1": "2018-08-14T20:57:01-07:00", |                 |                   |                                          |
|   "x2": [                            |                 |                   |                                          |
|     {                                |                 |                   |                                          |
|       "y1": "green",                 |                 |                   |                                          |
|       "y2": "35"                     |                 |                   |                                          |
|     }                                |                 |                   |                                          |
|   ]                                  |                 |                   |                                          |
| }                                    |                 |                   |                                          |
| {                                    | INSERT          | False             | 765432u89tk3l6y456789012rst7vx678912456k |
|   "id": 7078,                        |                 |                   |                                          |
|   "x1": "2018-08-14T21:07:26-07:00", |                 |                   |                                          |
|   "x2": [                            |                 |                   |                                          |
|     {                                |                 |                   |                                          |
|       "y1": "cyan",                  |                 |                   |                                          |
|       "y2": "107"                    |                 |                   |                                          |
|     }                                |                 |                   |                                          |
|   ]                                  |                 |                   |                                          |
| }                                    |                 |                   |                                          |
+--------------------------------------+-----------------+-------------------+------------------------------------------+

-- Access and lock the stream
BEGIN;

-- Transform and copy JSON elements into relational columns
-- in the production tables
INSERT INTO data_prod1 (id, ts)
SELECT t.raw:id, to_timestamp (t.raw:x1)
FROM data_check t
WHERE METADATA$ACTION = 'INSERT';

INSERT INTO data_prod2 (id, color, num)
SELECT t.raw:id, f.value:y1, f.value:y2
FROM data_check t
, lateral flatten(input => raw:x2) f
WHERE METADATA$ACTION = 'INSERT';

-- Commit changes in the stream objects participating in the transaction
COMMIT;

SELECT * FROM data_prod1;

+------+-------------------------+
|   ID | TS                      |
|------+-------------------------|
| 7077 | 2018-08-14 20:57:01.000 |
| 7078 | 2018-08-14 21:07:26.000 |
+------+-------------------------+

SELECT * FROM data_prod2;

+------+-------+-----+
|   ID | COLOR | NUM |
|------+-------+-----|
| 7077 | green |  35 |
| 7078 | cyan  | 107 |
+------+-------+-----+

SELECT * FROM data_check;

+-----+-----------------+-------------------+
| RAW | METADATA$ACTION | METADATA$ISUPDATE |
|-----+-----------------+-------------------|
+-----+-----------------+-------------------+