Catégories :

DDL de pipeline de données

CREATE STREAM

Crée un nouveau flux dans le schéma actuel/spécifié ou remplace un flux existant. Un flux enregistre les modifications apportées au langage de manipulation de données (DML) dans une table, y compris des informations sur les insertions, les mises à jour et les suppressions. La table pour laquelle les modifications sont enregistrées s’appelle la table source.

En outre, cette commande prend en charge les variantes suivantes :

  • CREATE STREAM … CLONE (crée un clone d’un flux existant)

Voir aussi :

ALTER STREAM , DROP STREAM , SHOW STREAMS

Dans ce chapitre :

Syntaxe

-- Table stream
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
  <name>
  [ COPY GRANTS ]
  ON TABLE <table_name>
  [ { AT | BEFORE } { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> } ]
  [ APPEND_ONLY = TRUE | FALSE ]
  [ COMMENT = '<string_literal>' ]

-- External table stream
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
  <name>
  [ COPY GRANTS ]
  ON EXTERNAL TABLE <external_table_name>
  [ { AT | BEFORE } { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> } ]
  [ INSERT_ONLY = TRUE ]
  [ COMMENT = '<string_literal>' ]

Syntaxe des variantes

CREATE STREAM … CLONE

Crée un nouveau flux avec la même définition que le flux source. Le clone hérite du décalage actuel (c’est-à-dire de la version transactionnelle actuelle de la table) du flux source.

CREATE [ OR REPLACE ] STREAM <name> CLONE <source_stream>
  [ COPY GRANTS ]
  [ ... ]

Pour plus d’informations sur le clonage, voir CREATE <objet> … CLONE.

Paramètres requis

nom

Chaîne qui indique l’identificateur (c’est-à-dire le nom) du flux ; doit être unique pour le schéma dans lequel le flux est créé.

De plus, l’identificateur doit commencer par un caractère alphabétique et ne peut pas contenir d’espaces ou de caractères spéciaux à moins que toute la chaîne d’identificateur soit délimitée par des guillemets doubles (p. ex. "My object"). Les identificateurs entre guillemets doubles sont également sensibles à la casse.

Pour plus de détails, voir Exigences relatives à l’identificateur.

nom_table

Chaîne spécifiant l’identificateur (nom) de la table dont les modifications sont suivies par le flux (c’est-à-dire la table source).

Contrôle d’accès

Pour interroger un flux, un rôle doit disposer du privilège SELECT sur la table sous-jacente.

nom_table_externe

Chaîne spécifiant l’identificateur (c’est-à-dire le nom) de la table externe dont les modifications sont suivies par le flux (c’est-à-dire la table externe source).

Contrôle d’accès

Pour interroger un flux, un rôle doit disposer du privilège SELECT sur la table externe sous-jacente.

Paramètres facultatifs

COPY GRANTS

Spécifie de conserver les droits d’accès du flux d’origine lorsqu’un nouveau flux est créé à l’aide de l’une des variables CREATE STREAM suivantes :

  • CREATE OR REPLACE STREAM

  • CREATE STREAM … CLONE

Ce paramètre copie toutes les autorisations, excepté OWNERSHIP, du flux existant vers le nouveau flux. Par défaut, le rôle qui exécute la commande CREATE STREAM possède le nouveau flux.

Note

  • Si l’instruction CREATE STREAM fait référence à plusieurs flux (p. ex. create or replace stream t1 clone t2;), la clause COPY GRANTS donne priorité au flux à remplacer.

  • La sortie SHOW GRANTS pour le flux de remplacement liste le concessionnaire des privilèges copiés comme le rôle qui a exécuté l’instruction CREATE STREAM, avec l’horodatage courant lorsque l’instruction a été exécutée.

  • L’opération de copie des accords s’effectue atomiquement dans la commande CREATE STREAM (c’est-à-dire dans la même transaction).

Note

Ce paramètre n’est pas pris en charge actuellement.

AT | BEFORE TIMESTAMP => <horodatage> | OFFSET => <time_difference> | STATEMENT => <id>

Crée un flux sur une table à un moment précis dans le passé (à l’aide de la fonction Time Travel). La clause AT | BEFORE détermine le point du passé à partir duquel des données historiques sont demandées pour la table :

  • Le mot clé AT spécifie que la requête inclut tous les changements apportés par une instruction ou une transaction dont l’horodatage est égal au paramètre spécifié.

  • Le mot clé BEFORE spécifie que la requête se réfère à un point précédant immédiatement le paramètre spécifié.

Note

Actuellement, un flux doit être créé sur une table avant que les informations de suivi des modifications ne soient enregistrées pour la table. Si aucun flux n’existait dans la table à un moment donné dans le passé, indiqué dans la clause AT | BEFORE, l’instruction CREATE STREAM échoue. Aucun flux ne peut être créé à un moment antérieur à l’enregistrement du suivi des modifications.

APPEND_ONLY = TRUE | FALSE

Spécifie s’il s’agit d’un flux d’ajout uniquement. Les flux d’ajout uniquement suivent uniquement les insertions de ligne. Les opérations de mise à jour et de suppression (y compris les troncations de table) ne sont pas enregistrées. Par exemple, si 10 lignes sont insérées dans une table et que 5 de ces lignes sont supprimées avant que le décalage pour un flux d’ajout uniquement soit avancé, le flux enregistre 10 lignes.

Ce type de flux améliore les performances des requêtes par rapport aux flux standard et est très utile pour l’extraction, le chargement et la transformation (ELT), et pour des scénarios similaires qui dépendent exclusivement des insertions de ligne.

Un flux standard joint les lignes supprimées et insérées dans l’ensemble de modifications pour déterminer quelles lignes ont été supprimées et lesquelles ont été mises à jour. Un flux d’ajout uniquement renvoie les lignes ajoutées uniquement et peut donc être beaucoup plus performant qu’un flux standard. Par exemple, la table source peut être tronquée immédiatement après la consommation des lignes d’un flux d’ajout uniquement, et les suppressions d’enregistrement ne contribuent pas à la surcharge la prochaine fois que le flux est interrogé ou consommé.

Par défaut

FALSE

INSERT_ONLY = TRUE | FALSE

Spécifie s’il s’agit d’un flux à insertion uniquement. Les flux à insertion uniquement suivent uniquement les insertions de lignes ; ils n’enregistrent pas les opérations de suppression qui suppriment des lignes d’un ensemble inséré (c’est-à-dire sans opération). Par exemple, entre deux décalages, si le fichier1 est supprimé de l’emplacement de stockage Cloud référencé par la table externe et que le fichier2 est ajouté, le flux renvoie les enregistrements pour les lignes du fichier2 uniquement. Contrairement au suivi des données CDC pour les tables standard, Snowflake ne peut pas accéder aux enregistrements historiques des fichiers stockés dans le Cloud.

Note

La prise en charge des flux de table à insertion uniquement est fournie sous la forme d’une fonctionnalité en avant-première.

Par défaut

FALSE

COMMENT = 'litéral_chaine'

Chaîne (littéral) qui spécifie un commentaire pour la table.

Par défaut : aucune valeur

Sortie

La sortie d’un flux comprend les mêmes colonnes que la table source, ainsi que les colonnes supplémentaires suivantes :

  • METADATA$ACTION : spécifie l’action (INSERT ou DELETE).

  • METADATA$ISUPDATE : spécifie si l’action enregistrée (INSERT ou DELETE) fait partie d’un UPDATE appliqué aux lignes de la table source.

    Notez que les flux enregistrent les différences entre deux décalages. Si une ligne est ajoutée puis mise à jour dans le décalage actuel, la modification delta est une nouvelle ligne. La ligne METADATA$ISUPDATE enregistre une valeur FALSE.

  • METADATA$ROW_ID : spécifie l’ID unique et immuable pour la ligne, qui peut être utilisé pour suivre les modifications apportées à des lignes spécifiques au fil du temps.

Notes sur l’utilisation

  • La création d’un flux nécessite un rôle explicitement doté des privilèges suivants, ainsi que des privilèges USAGE sur la base de données et le schéma :

    • Schéma : CREATE STREAM

    • Table source : SELECT

  • Un flux peut être interrogé plusieurs fois pour mettre à jour plusieurs objets dans la même transaction et il renverra les mêmes données.

  • La position du flux (c’est-à-dire le décalage) est avancée lorsque le flux est utilisé dans une instruction DML. La position est mise à jour à la fin de la transaction avec l’horodatage de début de la transaction. Le flux décrit les enregistrements de modification commençant à la position actuelle du flux et se terminant à l’horodatage transactionnel actuel.

    Pour vous assurer que plusieurs instructions ont accès aux mêmes enregistrements de modification dans le flux, entourez-les d’une instruction de transaction explicite (BEGIN .. COMMIT). Une transaction explicite verrouille le flux, de sorte que les mises à jour de la table source DML ne soient pas signalées au flux tant que la transaction n’est pas validée.

  • Les flux n’ont pas de période de conservation Fail-safe ou Time Travel. Les métadonnées de ces objets ne peuvent pas être récupérées si un flux est détruit.

  • Lorsque le premier flux d’une table est créé, une paire de colonnes masquées est ajoutée à la table et commence à stocker les métadonnées de suivi des modifications. Les colonnes consomment une petite quantité de stockage.

Exemples

Création d’un flux de table

Créer un flux sur la table mytable :

CREATE STREAM mystream ON TABLE mytable;

Utilisation de Time Travel avec la table source

Créer un flux sur la table mytable tel qu’il existait avant la date et l’heure dans l’horodatage spécifié :

CREATE STREAM mystream ON TABLE mytable BEFORE (TIMESTAMP => TO_TIMESTAMP(40*365*86400));

Créer un flux sur la table mytable tel qu’il existait exactement à la date et à l’heure de l’horodatage spécifié :

CREATE STREAM mystream ON TABLE mytable AT (TIMESTAMP => TO_TIMESTAMP_TZ('02/02/2019 01:02:03', 'mm/dd/yyyy hh24:mi:ss'));

Créer un flux sur la table mytable tel qu’il existait il y a cinq minutes :

CREATE STREAM mystream ON TABLE mytable AT(OFFSET => -60*5);

Créer un flux sur la table mytable avec les transactions jusqu’alors, mais sans inclure les modifications apportées par la transaction spécifiée :

CREATE STREAM mystream ON TABLE mytable BEFORE(STATEMENT => '8e5d0ca9-005e-44e6-b858-a8f5b37c5726');

Création d’un flux à insertion uniquement sur une table externe

Créez un flux de table externe et interrogez les enregistrements de capture de données modifiées dans le flux, qui suivent les enregistrements ajoutés aux métadonnées de la table externe :

-- Create an external table that points to the MY_EXT_STAGE stage.
-- The external table is partitioned by the date (in YYYY/MM/DD format) in the file path.
CREATE OR REPLACE EXTERNAL TABLE my_ext_table (
  date_part date as to_date(substr(metadata$filename, 11, 10), 'YYYY/MM/DD'),
  ts timestamp AS (value:time::timestamp),
  user_id varchar AS (value:userId::varchar),
  color varchar AS (value:color::varchar)
) PARTITION BY (date_part)
  LOCATION=@my_ext_stage
  AUTO_REFRESH = false
  FILE_FORMAT=(TYPE=JSON);

-- Create a stream on the external table
CREATE OR REPLACE STREAM my_ext_table_stream ON EXTERNAL TABLE exttable_s3_part INSERT_ONLY = TRUE;

-- Execute SHOW streams
-- The MODE column indicates that the new stream is an INSERT_ONLY stream
SHOW STREAMS;
+-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------+
| created_on                    | name                   | database_name | schema_name | owner        | comment   | table_name                         | type  | stale | mode        |
|-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------|
| 2020-08-02 05:13:20.174 -0800 | MY_EXT_TABLE_STREAM    | MYDB          | PUBLIC      | MYROLE       |           | MYDB.PUBLIC.EXTTABLE_S3_PART       | DELTA | false | INSERT_ONLY |
+-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------+

-- Add a file named '2020/08/05/1408/log-08051409.json' to the stage using the appropriate tool for the cloud storage service.

-- Manually refresh the external table metadata.
ALTER EXTERNAL TABLE my_ext_table REFRESH;

-- Query the external table stream.
-- The stream indicates that the rows in the added JSON file were recorded in the external table metadata.
SELECT * FROM my_ext_table_stream;
+----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------+
| VALUE                                  | DATE_PART  | TS                      | USER_ID | COLOR | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | METADATA$FILENAME                           |
|----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------|
| {                                      | 2020-08-05 | 2020-08-05 15:57:01.000 | user25  | green | INSERT          | False             |                 | test/logs/2020/08/05/1408/log-08051409.json |
|   "color": "green",                    |            |                         |         |       |                 |                   |                 |                                             |
|   "time": "2020-08-05 15:57:01-07:00", |            |                         |         |       |                 |                   |                 |                                             |
|   "userId": "user25"                   |            |                         |         |       |                 |                   |                 |                                             |
| }                                      |            |                         |         |       |                 |                   |                 |                                             |
| {                                      | 2020-08-05 | 2020-08-05 15:58:02.000 | user56  | brown | INSERT          | False             |                 | test/logs/2020/08/05/1408/log-08051409.json |
|   "color": "brown",                    |            |                         |         |       |                 |                   |                 |                                             |
|   "time": "2020-08-05 15:58:02-07:00", |            |                         |         |       |                 |                   |                 |                                             |
|   "userId": "user56"                   |            |                         |         |       |                 |                   |                 |                                             |
| }                                      |            |                         |         |       |                 |                   |                 |                                             |
+----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------+