Beispiele für kontinuierliche Datenpipelines

Unter diesem Thema finden Sie praktische Beispiele für Anwendungsfälle von Datenpipelines.

Unter diesem Thema:

Voraussetzungen

Die Rolle, die zum Ausführen der SQL-Anweisungen in diesen Beispielen verwendet wird, erfordert die folgenden Zugriffssteuerungsrechte:

EXECUTE TASK

Globale EXECUTE TASK-Berechtigung zum Ausführen von Aufgaben

USAGE

USAGE-Berechtigungen für die Datenbank und das Schema, in denen die SQL-Anweisungen ausgeführt werden, sowie für das Warehouse, in dem in diesen Beispielen alle Aufgaben ausgeführt werden.

CREATE object

Verschiedene CREATE object-Berechtigungen für das Schema, in dem die SQL-Anweisungen ausgeführt werden, um Objekte wie Tabellen, Streams und Aufgaben zu erstellen.

Weitere Informationen zur Zugriffssteuerung in Snowflake finden Sie unter Übersicht zur Zugriffssteuerung.

Transformieren geladener JSON-Daten nach einem Zeitplan

Im folgenden Beispiel werden JSON-Rohdaten in eine einzige Zieltabelle mit dem Namen raw geladen. Zwei Aufgaben führen Abfragen auf Tabellenstreams aus, die in der Tabelle raw erstellt wurden, und fügen Teilmengen von Zeilen in mehrere Tabellen ein. Da bei jeder Aufgabe die Change Data Capture-Datensätze eines Tabellenstreams verwendet werden, sind mehrere Datenstreams erforderlich.

-- Create a landing table to store raw JSON data.
-- Snowpipe could load data into this table.
create or replace table raw (var variant);

-- Create a stream to capture inserts to the landing table.
-- A task will consume a set of columns from this stream.
create or replace stream rawstream1 on table raw;

-- Create a second stream to capture inserts to the landing table.
-- A second task will consume another set of columns from this stream.
create or replace stream rawstream2 on table raw;

-- Create a table that stores the names of office visitors identified in the raw data.
create or replace table names (id int, first_name string, last_name string);

-- Create a table that stores the visitation dates of office visitors identified in the raw data.
create or replace table visits (id int, dt date);

-- Create a task that inserts new name records from the rawstream1 stream into the names table
-- every minute when the stream contains records.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
create or replace task raw_to_names
warehouse = mywh
schedule = '1 minute'
when
system$stream_has_data('rawstream1')
as
merge into names n
  using (select var:id id, var:fname fname, var:lname lname from rawstream1) r1 on n.id = to_number(r1.id)
  when matched then update set n.first_name = r1.fname, n.last_name = r1.lname
  when not matched then insert (id, first_name, last_name) values (r1.id, r1.fname, r1.lname)
;

-- Create another task that merges visitation records from the rawstream1 stream into the visits table
-- every minute when the stream contains records.
-- Records with new IDs are inserted into the visits table;
-- Records with IDs that exist in the visits table update the DT column in the table.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
create or replace task raw_to_visits
warehouse = mywh
schedule = '1 minute'
when
system$stream_has_data('rawstream2')
as
merge into visits v
  using (select var:id id, var:visit_dt visit_dt from rawstream2) r2 on v.id = to_number(r2.id)
  when matched then update set v.dt = r2.visit_dt
  when not matched then insert (id, dt) values (r2.id, r2.visit_dt)
;

-- Resume both tasks.
alter task raw_to_names resume;
alter task raw_to_visits resume;

-- Insert a set of records into the landing table.
insert into raw
  select parse_json(column1)
  from values
  ('{"id": "123","fname": "Jane","lname": "Smith","visit_dt": "2019-09-17"}'),
  ('{"id": "456","fname": "Peter","lname": "Williams","visit_dt": "2019-09-17"}');

-- Query the change data capture record in the table streams
select * from rawstream1;
select * from rawstream2;

-- Wait for the tasks to run.
-- A tiny buffer is added to the wait time
-- because absolute precision in task scheduling is not guaranteed.
call system$wait(70);

-- Query the table streams again.
-- Records should be consumed and no longer visible in streams.

-- Verify the records were inserted into the target tables.
select * from names;
select * from visits;

-- Insert another set of records into the landing table.
-- The records include both new and existing IDs in the target tables.
insert into raw
  select parse_json(column1)
  from values
  ('{"id": "456","fname": "Peter","lname": "Williams","visit_dt": "2019-09-25"}'),
  ('{"id": "789","fname": "Ana","lname": "Glass","visit_dt": "2019-09-25"}');

-- Wait for the tasks to run.
call system$wait(70);

-- Records should be consumed and no longer visible in streams.
select * from rawstream1;
select * from rawstream2;

-- Verify the records were inserted into the target tables.
select * from names;
select * from visits;
Copy

Entladen von Daten nach einem Zeitplan

Im folgenden Beispiel werden die Change Data Capture-Datensätze in einem Stream in eine interne (d. h. Snowflake) Stagingbereich entladen.

-- Use the landing table from the previous example.
-- Alternatively, create a landing table.
-- Snowpipe could load data into this table.
create or replace table raw (id int, type string);

-- Create a stream on the table.  We will use this stream to feed the unload command.
create or replace stream rawstream on table raw;

-- Create a task that executes the COPY statement every minute.
-- The COPY statement reads from the stream and loads into the table stage for the landing table.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
create or replace task unloadtask
warehouse = mywh
schedule = '1 minute'
when
  system$stream_has_data('RAWSTREAM')
as
copy into @%raw/rawstream from rawstream overwrite=true;
;

-- Resume the task.
alter task unloadtask resume;

-- Insert raw data into the landing table.
insert into raw values (3,'processed');

-- Query the change data capture record in the table stream
select * from rawstream;

-- Wait for the tasks to run.
-- A tiny buffer is added to the wait time
-- because absolute precision in task scheduling is not guaranteed.
call system$wait(70);

-- Records should be consumed and no longer visible in the stream.
select * from rawstream;

-- Verify the COPY statement unloaded a data file into the table stage.
ls @%raw;
Copy

Aktualisieren externer Tabellenmetadaten nach einem Zeitplan

Im folgenden Beispiel werden die Metadaten einer externen Tabelle mit dem Namen mydb.myschema.exttable (unter Verwendung von ALTER EXTERNAL TABLE … REFRESH) nach einem Zeitplan aktualisiert.

Bemerkung

Wenn eine externe Tabelle erstellt wird, wird der Parameter AUTO_REFRESH standardmäßig auf TRUE gesetzt. Wir empfehlen, diesen Standardwert für externe Tabellen zu akzeptieren, die auf Datendateien in Amazon S3- oder Microsoft Azure-Stagingbereichen verweisen. Die Option für die automatische Aktualisierung ist derzeit jedoch nicht für externe Tabellen verfügbar, die auf Google Cloud Storage-Stagingbereiche verweisen. Für diese externen Tabellen kann es hilfreich sein, die Metadaten nach einem Zeitplan manuell zu aktualisieren.

-- Create a task that executes an ALTER EXTERNAL TABLE ... REFRESH statement every 5 minutes.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
CREATE TASK exttable_refresh_task
WAREHOUSE=mywh
SCHEDULE='5 minutes'
  AS
ALTER EXTERNAL TABLE mydb.myschema.exttable REFRESH;
Copy