Exemplos de pipeline contínuo de dados¶
Este tópico fornece exemplos práticos de casos de uso para pipelines de dados.
Neste tópico:
Pré-requisitos¶
A função utilizada para executar as instruções SQL nestes exemplos requer os seguintes privilégios de controle de acesso:
EXECUTE TASK
Privilégio EXECUTE TASK global para executar tarefas
USAGE
Privilégio USAGE no banco de dados e o esquema nos quais as instruções SQL são executadas, bem como sobre o warehouse que executa quaisquer tarefas nestes exemplos.
CREATE object
Diversos privilégios
CREATE object
no esquema em que as instruções SQL são executadas, para criar objetos tais como tabelas, fluxos e tarefas.
Para obter mais informações sobre o controle de acesso no Snowflake, consulte Visão geral do controle de acesso.
Transformando dados JSON carregados em um cronograma¶
O exemplo seguinte carrega dados JSON brutos em uma única tabela de destino chamada raw
. Duas tarefas consultam fluxos de tabela criados na tabela raw
e inserem subconjuntos de linhas em várias tabelas. Uma vez que cada tarefa consome os registros de captura de dados de alteração em um fluxo de tabela, são necessários vários fluxos.
-- Create a landing table to store raw JSON data.
-- Snowpipe could load data into this table.
create or replace table raw (var variant);
-- Create a stream to capture inserts to the landing table.
-- A task will consume a set of columns from this stream.
create or replace stream rawstream1 on table raw;
-- Create a second stream to capture inserts to the landing table.
-- A second task will consume another set of columns from this stream.
create or replace stream rawstream2 on table raw;
-- Create a table that stores the names of office visitors identified in the raw data.
create or replace table names (id int, first_name string, last_name string);
-- Create a table that stores the visitation dates of office visitors identified in the raw data.
create or replace table visits (id int, dt date);
-- Create a task that inserts new name records from the rawstream1 stream into the names table
-- every minute when the stream contains records.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
create or replace task raw_to_names
warehouse = mywh
schedule = '1 minute'
when
system$stream_has_data('rawstream1')
as
merge into names n
using (select var:id id, var:fname fname, var:lname lname from rawstream1) r1 on n.id = to_number(r1.id)
when matched then update set n.first_name = r1.fname, n.last_name = r1.lname
when not matched then insert (id, first_name, last_name) values (r1.id, r1.fname, r1.lname)
;
-- Create another task that merges visitation records from the rawstream1 stream into the visits table
-- every minute when the stream contains records.
-- Records with new IDs are inserted into the visits table;
-- Records with IDs that exist in the visits table update the DT column in the table.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
create or replace task raw_to_visits
warehouse = mywh
schedule = '1 minute'
when
system$stream_has_data('rawstream2')
as
merge into visits v
using (select var:id id, var:visit_dt visit_dt from rawstream2) r2 on v.id = to_number(r2.id)
when matched then update set v.dt = r2.visit_dt
when not matched then insert (id, dt) values (r2.id, r2.visit_dt)
;
-- Resume both tasks.
alter task raw_to_names resume;
alter task raw_to_visits resume;
-- Insert a set of records into the landing table.
insert into raw
select parse_json(column1)
from values
('{"id": "123","fname": "Jane","lname": "Smith","visit_dt": "2019-09-17"}'),
('{"id": "456","fname": "Peter","lname": "Williams","visit_dt": "2019-09-17"}');
-- Query the change data capture record in the table streams
select * from rawstream1;
select * from rawstream2;
-- Wait for the tasks to run.
-- A tiny buffer is added to the wait time
-- because absolute precision in task scheduling is not guaranteed.
call system$wait(70);
-- Query the table streams again.
-- Records should be consumed and no longer visible in streams.
-- Verify the records were inserted into the target tables.
select * from names;
select * from visits;
-- Insert another set of records into the landing table.
-- The records include both new and existing IDs in the target tables.
insert into raw
select parse_json(column1)
from values
('{"id": "456","fname": "Peter","lname": "Williams","visit_dt": "2019-09-25"}'),
('{"id": "789","fname": "Ana","lname": "Glass","visit_dt": "2019-09-25"}');
-- Wait for the tasks to run.
call system$wait(70);
-- Records should be consumed and no longer visible in streams.
select * from rawstream1;
select * from rawstream2;
-- Verify the records were inserted into the target tables.
select * from names;
select * from visits;
Descarregar de dados em um cronograma¶
No exemplo a seguir, os registros de captura de dados de alteração em um fluxo são descarregados em um preparo interno (ou seja, do Snowflake).
-- Use the landing table from the previous example.
-- Alternatively, create a landing table.
-- Snowpipe could load data into this table.
create or replace table raw (id int, type string);
-- Create a stream on the table. We will use this stream to feed the unload command.
create or replace stream rawstream on table raw;
-- Create a task that executes the COPY statement every minute.
-- The COPY statement reads from the stream and loads into the table stage for the landing table.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
create or replace task unloadtask
warehouse = mywh
schedule = '1 minute'
when
system$stream_has_data('RAWSTREAM')
as
copy into @%raw/rawstream from rawstream overwrite=true;
;
-- Resume the task.
alter task unloadtask resume;
-- Insert raw data into the landing table.
insert into raw values (3,'processed');
-- Query the change data capture record in the table stream
select * from rawstream;
-- Wait for the tasks to run.
-- A tiny buffer is added to the wait time
-- because absolute precision in task scheduling is not guaranteed.
call system$wait(70);
-- Records should be consumed and no longer visible in the stream.
select * from rawstream;
-- Verify the COPY statement unloaded a data file into the table stage.
ls @%raw;
Atualização dos metadados de tabela externa em um cronograma¶
No exemplo a seguir, os metadados para uma tabela externa chamada mydb.myschema.exttable
são atualizados (usando ALTER EXTERNAL TABLE … REFRESH) em um cronograma.
Nota
Quando uma tabela externa é criada, o parâmetro AUTO_REFRESH é definido como TRUE
por padrão. Recomendamos que você aceite este valor padrão para tabelas externas que fazem referência a arquivos de dados em estágios do Amazon S3 ou do Microsoft Azure. Entretanto, a opção de atualização automática não está disponível atualmente para tabelas externas que fazem referência a preparos de Google Cloud Storage. No caso destas tabelas externas, pode ser útil atualizar manualmente os metadados em um cronograma.
-- Create a task that executes an ALTER EXTERNAL TABLE ... REFRESH statement every 5 minutes.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
CREATE TASK exttable_refresh_task
WAREHOUSE=mywh
SCHEDULE='5 minutes'
AS
ALTER EXTERNAL TABLE mydb.myschema.exttable REFRESH;