Exemples avec un pipeline de données continues

Cette rubrique fournit des exemples pratiques de cas d’utilisation pour des pipelines de données.

Dans ce chapitre :

Conditions préalables

Le rôle utilisé pour exécuter les instructions SQL dans ces exemples nécessite les privilèges de contrôle d’accès suivants :

EXECUTE TASK

Privilège EXECUTE TASK global pour exécuter des tâches

USAGE

Privilège USAGE sur la base de données et le schéma dans lesquels les instructions SQL sont exécutées, ainsi que sur l’entrepôt qui exécute les tâches dans ces exemples.

CREATE object

Divers privilèges CREATE object sur le schéma dans lequel les instructions SQL sont exécutées, pour créer des objets tels que des tables, des flux et des tâches.

Pour plus d’informations sur le contrôle d’accès dans Snowflake, voir Aperçu du contrôle d’accès.

Transformation des données JSON chargées dans une planification

L’exemple suivant charge des données JSON brutes dans une seule table de destination nommée raw. Deux tâches interrogent les flux de table créés sur la table raw et insèrent des sous-ensembles de lignes dans plusieurs tables. Étant donné que chaque tâche consomme les enregistrements de capture de données modifiées dans un flux de table, plusieurs flux sont requis.

-- Create a landing table to store raw JSON data.
-- Snowpipe could load data into this table.
create or replace table raw (var variant);

-- Create a stream to capture inserts to the landing table.
-- A task will consume a set of columns from this stream.
create or replace stream rawstream1 on table raw;

-- Create a second stream to capture inserts to the landing table.
-- A second task will consume another set of columns from this stream.
create or replace stream rawstream2 on table raw;

-- Create a table that stores the names of office visitors identified in the raw data.
create or replace table names (id int, first_name string, last_name string);

-- Create a table that stores the visitation dates of office visitors identified in the raw data.
create or replace table visits (id int, dt date);

-- Create a task that inserts new name records from the rawstream1 stream into the names table
-- every minute when the stream contains records.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
create or replace task raw_to_names
warehouse = mywh
schedule = '1 minute'
when
system$stream_has_data('rawstream1')
as
merge into names n
  using (select var:id id, var:fname fname, var:lname lname from rawstream1) r1 on n.id = to_number(r1.id)
  when matched then update set n.first_name = r1.fname, n.last_name = r1.lname
  when not matched then insert (id, first_name, last_name) values (r1.id, r1.fname, r1.lname)
;

-- Create another task that merges visitation records from the rawstream1 stream into the visits table
-- every minute when the stream contains records.
-- Records with new IDs are inserted into the visits table;
-- Records with IDs that exist in the visits table update the DT column in the table.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
create or replace task raw_to_visits
warehouse = mywh
schedule = '1 minute'
when
system$stream_has_data('rawstream2')
as
merge into visits v
  using (select var:id id, var:visit_dt visit_dt from rawstream2) r2 on v.id = to_number(r2.id)
  when matched then update set v.dt = r2.visit_dt
  when not matched then insert (id, dt) values (r2.id, r2.visit_dt)
;

-- Resume both tasks.
alter task raw_to_names resume;
alter task raw_to_visits resume;

-- Insert a set of records into the landing table.
insert into raw
  select parse_json(column1)
  from values
  ('{"id": "123","fname": "Jane","lname": "Smith","visit_dt": "2019-09-17"}'),
  ('{"id": "456","fname": "Peter","lname": "Williams","visit_dt": "2019-09-17"}');

-- Query the change data capture record in the table streams
select * from rawstream1;
select * from rawstream2;

-- Wait for the tasks to run.
-- A tiny buffer is added to the wait time
-- because absolute precision in task scheduling is not guaranteed.
call system$wait(70);

-- Query the table streams again.
-- Records should be consumed and no longer visible in streams.

-- Verify the records were inserted into the target tables.
select * from names;
select * from visits;

-- Insert another set of records into the landing table.
-- The records include both new and existing IDs in the target tables.
insert into raw
  select parse_json(column1)
  from values
  ('{"id": "456","fname": "Peter","lname": "Williams","visit_dt": "2019-09-25"}'),
  ('{"id": "789","fname": "Ana","lname": "Glass","visit_dt": "2019-09-25"}');

-- Wait for the tasks to run.
call system$wait(70);

-- Records should be consumed and no longer visible in streams.
select * from rawstream1;
select * from rawstream2;

-- Verify the records were inserted into the target tables.
select * from names;
select * from visits;
Copy

Déchargement des données dans une planification

L’exemple suivant décharge les enregistrements de capture de données modifiées dans un flux dans une zone de préparation interne (c’est-à-dire Snowflake).

-- Use the landing table from the previous example.
-- Alternatively, create a landing table.
-- Snowpipe could load data into this table.
create or replace table raw (id int, type string);

-- Create a stream on the table.  We will use this stream to feed the unload command.
create or replace stream rawstream on table raw;

-- Create a task that executes the COPY statement every minute.
-- The COPY statement reads from the stream and loads into the table stage for the landing table.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
create or replace task unloadtask
warehouse = mywh
schedule = '1 minute'
when
  system$stream_has_data('RAWSTREAM')
as
copy into @%raw/rawstream from rawstream overwrite=true;
;

-- Resume the task.
alter task unloadtask resume;

-- Insert raw data into the landing table.
insert into raw values (3,'processed');

-- Query the change data capture record in the table stream
select * from rawstream;

-- Wait for the tasks to run.
-- A tiny buffer is added to the wait time
-- because absolute precision in task scheduling is not guaranteed.
call system$wait(70);

-- Records should be consumed and no longer visible in the stream.
select * from rawstream;

-- Verify the COPY statement unloaded a data file into the table stage.
ls @%raw;
Copy

Actualisation des métadonnées de table externe dans une planification

L’exemple suivant actualise les métadonnées d’une table externe nommée mydb.myschema.exttable (à l’aide de ALTER EXTERNAL TABLE … REFRESH) selon une planification.

Note

Lorsqu’une table externe est créée, le paramètre AUTO_REFRESH est défini sur TRUE par défaut. Nous vous recommandons d’accepter cette valeur par défaut pour les tables externes qui référencent les fichiers de données dans les zones de préparation Amazon S3 ou Microsoft Azure. Cependant, l’option d’actualisation automatique n’est pas disponible actuellement pour les tables externes qui font référence aux zones de préparation de Google Cloud Storage. Pour ces tables externes, l’actualisation manuelle des métadonnées selon une planification peut être utile.

-- Create a task that executes an ALTER EXTERNAL TABLE ... REFRESH statement every 5 minutes.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
CREATE TASK exttable_refresh_task
WAREHOUSE=mywh
SCHEDULE='5 minutes'
  AS
ALTER EXTERNAL TABLE mydb.myschema.exttable REFRESH;
Copy