テーブルストリームを使用した変更追跡

ストリームオブジェクトは、挿入、更新、削除などのテーブルに加えられたデータ操作言語(DML)の変更、および各変更に関するメタデータを記録し、変更されたデータを使用してアクションを実行できるようにします。このプロセスは、変更データキャプチャ(CDC)と呼ばれます。個々のテーブルストリームは、 ソーステーブル の行に加えられた変更を追跡します。テーブルストリーム(単に「ストリーム」とも呼ばれます)は、テーブル内の2つのトランザクションポイント間で行レベルで変更された内容の「変更テーブル」を利用可能にします。これにより、トランザクション形式で一連の変更レコードをクエリおよび使用できます。

このトピックの内容:

テーブルストリームの概要

テーブルストリームは、作成されると、特定の時点(オフセット)をテーブルの現在のトランザクションバージョンとして初期化することにより、ソーステーブル内のすべての行の初期スナップショットを論理的に取得します。ストリームで使用される変更追跡システムは、このスナップショットの取得後にコミットされた DML 変更(挿入、更新、削除)に関する情報を記録します。変更レコードは、変更前後の行の状態を提供します。変更情報は、追跡されるソーステーブルの列構造を反映し、各変更イベントを説明する追加のメタデータ列を含みます。

ストリーム自体には、テーブルデータが 含まれない ことに注意してください。ストリームは、ソーステーブルのオフセットのみを格納し、ソーステーブルのバージョン管理履歴を活用して CDC レコードを返します。テーブルの最初のストリームが作成されると、非表示の列のペアがソーステーブルに追加され、変更追跡メタデータの保存が開始されます。これらの列は少量のストレージを消費します。ストリームのクエリ時に返される CDC レコードは、ストリームに保存されている オフセット と、テーブルに保存されている 変更追跡メタデータ の組み合わせに依存します。

ストリームをブックマークと考えると便利な場合があります。ブックマークは、ブックのページ(つまり、ソーステーブル)の特定の時点を示します。ブックマークは破棄でき、他のブックマークをブックのさまざまな場所に挿入できます。同様に、ストリームをドロップし、同じまたは異なる時点で他のストリームを作成することができ(異なる時間に連続してストリームを作成するか、 Time Travel を使用)、同じまたは異なるオフセットでテーブルの変更レコードを消費します。

CDC レコードのコンシューマーの一例は、 データパイプライン です。このパイプラインでは、最後の抽出以降に変更されたステージングテーブルのデータのみが変換され、他のテーブルにコピーされます。

現在、ストリームはマテリアライズドビューの変更を追跡できません。

テーブルのバージョン管理

ストリームは、 offset と呼ばれるソーステーブルのトランザクションバージョンのタイムラインにポイントを保持します。これは、ストリームの内容が DML ステートメントを使用して最後に消費されたトランザクションポイントから開始します。ストリームは、ソーステーブル(つまり、テーブルの現在のバージョン)の現在のオフセットから現在のトランザクション時間までの一連の変更を提供できます。ストリームは、変更のデルタのみを維持します。複数の DML ステートメントが行を変更する場合、ストリームにはその行で行われた最新のアクションのみが含まれます。

複数のクエリは、オフセットを変更することなく、ストリームから同じ変更データを独立して使用できます。ストリームは、 autocommit トランザクション(つまり、明示的にトランザクションを開始せずに実行される)を含む DML トランザクションで使用される場合にのみオフセットを進めます。デフォルトでは、自動コミットトランザクションは、成功すると自動的にコミットされるか、ステートメントの最後で失敗するとロールバックされます。この動作は AUTOCOMMIT パラメーターで制御されます)。

SQL ステートメントが明示的なトランザクション内でストリームをクエリすると、ストリームはステートメントが実行されたときではなく、トランザクションが開始されたときのストリームアドバンスポイント(つまり、タイムスタンプ)でクエリされます。この動作は、 DML ステートメントと、既存のストリームの行を新しいテーブルに取り込む CREATE TABLE ... AS SELECT (CTAS)ステートメントの両方に関係します。

ストリームの現在のオフセットは、 SYSTEM$STREAM_GET_TABLE_TIMESTAMP 関数をクエリすることにより決定できます。

ストリームから選択する DML ステートメントは、トランザクションが正常にコミットされる限り、ストリーム内のすべての変更データを消費します。複数のステートメントがストリーム内の同じ変更レコードにアクセスするようにするには、それらを明示的なトランザクションステートメント(BEGIN .. COMMIT)で囲みます。これにより、ストリームがロックされます。DML は並列トランザクションでのソーステーブルの更新は、変更追跡システムによって追跡されますが、明示的なトランザクションステートメントがコミットされ、既存の変更データが消費されるまでストリームを更新しません。

反復可能な読み取り分離

ストリームは反復可能な読み取り分離をサポートします。反復可能読み取りモードでは、トランザクション内の複数の SQL ステートメントがストリーム内の同じレコードセットを参照します。これは、テーブルでサポートされている読み取りコミットモードとは異なります。このモードでは、ステートメントは同じトランザクション内で実行された以前のステートメントによって行われた変更を、これらの変更がまだコミットされていない場合でも、確認します。

トランザクションでストリームによって返されるデルタレコードは、ストリームの現在位置からトランザクション開始時間までの範囲です。トランザクションがコミットされると、ストリーム位置はトランザクション開始時間まで進みます。それ以外の場合は、同じ位置に留まります。

次の例を考えてみましょう:

時間

トランザクション1

トランザクション2

1

トランザクションを開始します。

2

テーブル t1 のクエリストリーム s1 。ストリームは、現在の位置からトランザクション1の開始時刻までの変更データキャプチャレコード . を返します。ストリームが DML ステートメント . で使用されている場合、同時トランザクションによる変更を回避するためにストリームがロックされます。

3

テーブル t1 の行を更新します。

4

クエリストリーム s1Time 2 に使用されたときと同じストリームの状態を返します。

5

トランザクションをコミットします。ストリームがトランザクション内の DML ステートメントで消費された場合、ストリーム位置はトランザクション開始時間まで進みます。

6

トランザクションを開始します。

7

クエリストリーム s1。結果には、トランザクション1によってコミットされたテーブルの変更が含まれます。

トランザクション1内で、ストリーム s1 へのすべてのクエリは同じレコードセットを参照します。テーブル t1 へのDML 変更は、トランザクションがコミットされたときにのみストリームに記録されます。

トランザクション2で、ストリームへのクエリは、トランザクション1でテーブルに記録された変更を確認します。トランザクション1がコミットされる にトランザクション2が開始した場合、ストリームへのクエリは、ストリームの位置からトランザクション2の開始時刻までのストリームのスナップショットを返します。トランザクション1によってコミットされた変更は表示されません。

ストリーム列

ストリームは、ソーステーブルと同じ形状(つまり、同じ列名と順序)でデータを格納しますが、次の追加の列があります:

METADATA$ACTION

記録された DML 操作(INSERT、 DELETE)を示します。

METADATA$ISUPDATE

操作が UPDATE ステートメントの一部であったかどうかを示します。ソーステーブルの行の更新は、ストリーム内の DELETE および INSERT レコードのペアとして表され、メタデータ列の METADATA$ISUPDATE 値は TRUEに設定されます。

ストリームは2つのオフセットの違いを記録することに注意してください。行が追加され、現在のオフセットで更新された場合、デルタの変更は新しい行になります。 METADATA$ISUPDATE 行には FALSE 値が記録されます。

METADATA$ROW_ID

行の一意かつ不変の ID を指定します。これは、特定の行への変更を経時的に追跡するために使用できます。

ストリームのタイプ

次のストリーム型は、それぞれによって記録されたメタデータに基づいて使用できます。

Standard

標準(つまり、デルタ)テーブルストリームは、挿入、更新、削除(テーブルの切り捨てを含む)を含む、ソーステーブルに対するすべての DML の変更を追跡します。このストリーム型は、変更セットで挿入および削除された行で結合を実行して、行レベルのデルタを提供します。たとえば、実質的な効果として、テーブル内の2つのトランザクションポイントの間に挿入されてから削除された行は、デルタで削除されます(つまり、ストリームのクエリ時に返されない)。

追加のみ

追加専用のテーブルストリームは、行の挿入のみを追跡します。更新および削除操作(テーブルの切り捨てを含む)は記録されません。たとえば、10行がテーブルに挿入され、追加専用ストリームのオフセットが進む前にそれらの行の5が削除された場合、ストリームは10行を記録します。

追加専用ストリームは追加された行のみを返すため、抽出、ロード、変換(ELT)、および行の挿入のみに依存する類似のシナリオのパフォーマンスが、標準ストリームの場合よりもはるかに向上します。例えば、追加専用ストリームの行が消費された直後にソーステーブルを切り捨てることができ、レコードの削除は、次にストリームがクエリまたは消費されるときにオーバーヘッドに寄与しません。

挿入のみ

外部テーブルでのみサポートされます。 挿入のみのストリームは、行の挿入のみを追跡します。挿入されたセットから行を削除する削除操作(つまり、何もしない)は記録されません。例えば、任意の2つのオフセットの間で、外部テーブルによって参照されるクラウドストレージの場所からFile1が削除され、File2が追加された場合、ストリームはFile2の行のレコードのみを返します。標準テーブルの CDC データを追跡する場合とは異なり、Snowflakeはクラウドストレージ内のファイルの履歴レコードにアクセスできません。

注釈

挿入のみのテーブルストリームのサポートは、 プレビュー機能 として提供されます。

データフロー

次の図は、ソーステーブルの行が更新されるときに標準テーブルストリームの内容がどのように変化するかを示しています。 DML ステートメントがストリームコンテンツを消費するたびに、ストリーム位置が進み、テーブルに対する次の DML の変更(テーブルバージョンの変更)を追跡します。

Streams Example

データ保持期間と陳腐化

ストリームのオフセットが、ソーステーブルのデータ保持期間の外にある場合、ストリームは古くなっています。ストリームが古くなると、未使用の変更レコードを含め、ソーステーブルの履歴データにアクセスできなくなります。テーブルの 新しい 変更レコードを追跡するには、ストリームを再作成します( CREATE STREAM を使用)。ストリームが古くなるのを防ぐには、テーブルの保持期間中にトランザクション内でストリームレコードを消費します。データ保持期間の詳細については、 Time Travelの理解と使用 をご参照ください。

ソーステーブルのデータ保持期間が 14日未満 であり、ストリームが消費されていない場合、Snowflakeはこの期間を一時的に延長して、古くならないようにします。アカウントの Snowflake Edition に関係なく、期間はストリームのオフセットまで、最大14日まで延長されます。ストリームが消費されると、データ保持期間の延長はテーブルのデフォルト期間に短縮されます。

ストリームが古くなったかどうかを判断するには、 DESCRIBE STREAM または SHOW STREAMS コマンドを実行します。コマンド出力で、ストリームの STALE 列の値が TRUE の場合、ストリームは古くなっています。

注釈

現在、ソーステーブルとストリームを含むデータベースまたはスキーマのクローンが作成されると、(クローン内の)ストリーム内の未使用のレコードにアクセスできなくなります。この動作は、テーブルの Time Travel と一致しています。テーブルのクローンが作成される場合、テーブルクローンの履歴データは、クローンが作成された時間/時点で開始されます。

タスクを使用したストリーム消費

単一のテーブルストリームから変更データを消費する複数のタスクは、異なるデルタを取得します。タスクが DML ステートメントを使用してストリーム内の変更データを消費すると、ストリームはオフセットを進めます。変更データは、次に消費するタスクで使用できなくなります。現在は、単一のタスクのみがストリームの変更データを消費することをお勧めします。同じテーブルに対して複数のストリームを作成し、異なるタスクで使用できます。

CHANGES 句:ストリームに対する読み取り専用の代替

ストリームの代わりに、Snowflakeは SELECT ステートメントの CHANGES 句を使用して、テーブルの変更追跡メタデータのクエリをサポートします。 CHANGES 句を使用すると、明示的なトランザクションオフセットを使用してテーブルストリームを作成しなくても、2つの時点間で変更追跡メタデータをクエリできます。 CHANGES 句を使用しても、オフセットは前倒し されません (つまり、レコードを使用)。複数のクエリにより、異なるトランザクションの開始と終了の間で変更追跡メタデータを取得できます。このオプションでは、 AT | BEFORE 句を使用してメタデータのトランザクションの開始点を指定する必要があります。変更追跡間隔のエンドポイントは、オプションの END 句を使用して設定できます。

ストリームは、テーブルの現在のトランザクションバージョンを格納し、ほとんどのシナリオで CDC レコードの適切なソースです。任意の期間のオフセットを管理する必要があるまれなシナリオでは、 CHANGES 句を使用できます。

注釈

現在、テーブルの変更追跡メタデータが記録される前に、次の いずれか がtrueである必要があります。

  • テーブルで変更追跡が有効になります( ALTER TABLE ... CHANGE_TRACKING = TRUE を使用)。

  • テーブルのストリームが作成されます( CREATE STREAM を使用)。

どちらのオプションでも、非表示の列のペアがテーブルに追加され、変更追跡メタデータの保存が開始されます。列は少量のストレージを消費します。

これらの条件のいずれかが満たされる前の期間では、テーブルの変更追跡メタデータは使用できません。

共有テーブルのストリーム

共有テーブルにストリームを作成すると、データコンシューマーはそれらのテーブルで行われたデータ操作言語( DML )の変更を追跡できます。この機能は、「ローカル」テーブル(つまり、ストリームと同じアカウント)でのストリームの作成と使用に似ています。このトピックでは、データプロバイダーが共有テーブルを構成してストリームを作成し、コンシューマーがストリームを作成する手順について説明します。

手順については、 をご参照ください。

ストリームの請求

このトピックの データ保持期間と陳腐化 で説明したように、ストリームが定期的に消費されない場合、Snowflakeはソーステーブルのデータ保持期間を 一時的に 延長します。テーブルのデータ保持期間が14日未満の場合、アカウントの Snowflakeエディション に関係なく、期間はストリームトランザクションオフセット より短く 、または14日(テーブルのデータ保持期間が14日未満の場合)に、バックグラウンドで延長されます。

拡張データ保持期間には追加のストレージが必要であり、これは毎月のストレージ料金に反映されます。

ストリームに関連する主なコストは、仮想ウェアハウスがストリームをクエリするために使用する処理時間です。これらの料金は、なじみのあるSnowflakeクレジットとして請求書に表示されます。

ストリーム DDL

ストリームの作成と管理をサポートするために、Snowflakeは次の一連の特別な DDL コマンドを提供します:

さらに、プロバイダーは、次の標準アクセス制御 DDLを使用して、 ELT に必要なデータベースオブジェクトへのアクセスを表示、許可、または取り消すことができます:

必要なアクセス権限

ストリームを作成および管理するには、少なくとも次のロール権限を持つロールが必要です:

オブジェクト

権限

注意

データベース

USAGE

スキーマ

USAGE, CREATE STREAM

テーブル

SELECT

ストリームのクエリには、少なくとも次のロール権限を持つロールが必要です:

オブジェクト

権限

注意

データベース

USAGE

スキーマ

USAGE

ストリーム

SELECT

テーブル

SELECT

例1

次の例は、ソーステーブルで DML ステートメントが実行されると、ストリームの内容がどのように変化するかを示しています。

-- Create a table to store the names and fees paid by members of a gym
CREATE OR REPLACE TABLE members (
  id number(8) NOT NULL,
  name varchar(255) default NULL,
  fee number(3) NULL
);

-- Create a stream to track changes to date in the MEMBERS table
CREATE OR REPLACE STREAM member_check ON TABLE members;

-- Create a table to store the dates when gym members joined
CREATE OR REPLACE TABLE signup (
  id number(8),
  dt DATE
  );

INSERT INTO members (id,name,fee)
VALUES
(1,'Joe',0),
(2,'Jane',0),
(3,'George',0),
(4,'Betty',0),
(5,'Sally',0);

INSERT INTO signup
VALUES
(1,'2018-01-01'),
(2,'2018-02-15'),
(3,'2018-05-01'),
(4,'2018-07-16'),
(5,'2018-08-21');

-- The stream records the inserted rows
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
|  1 | Joe    |   0 | INSERT          | False             | d200504bf3049a7d515214408d9a804fd03b46cd |
|  2 | Jane   |   0 | INSERT          | False             | d0a551cecbee0f9ad2b8a9e81bcc33b15a525a1e |
|  3 | George |   0 | INSERT          | False             | b98ad609fffdd6f00369485a896c52ca93b92b1f |
|  4 | Betty  |   0 | INSERT          | False             | e554e6e68293a51d8e69d68e9b6be991453cc901 |
|  5 | Sally  |   0 | INSERT          | False             | c94366cf8a4270cf299b049af68a04401c13976d |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

-- Apply a $90 fee to members who joined the gym after a free trial period ended:
MERGE INTO members m
  USING (
    SELECT id, dt
    FROM signup s
    WHERE DATEDIFF(day, '2018-08-15'::date, s.dt::DATE) < -30) s
    ON m.id = s.id
  WHEN MATCHED THEN UPDATE SET m.fee = 90;

SELECT * FROM members;

+----+--------+-----+
| ID | NAME   | FEE |
|----+--------+-----|
|  1 | Joe    |  90 |
|  2 | Jane   |  90 |
|  3 | George |  90 |
|  4 | Betty  |   0 |
|  5 | Sally  |   0 |
+----+--------+-----+

-- The stream records the updated FEE column as a set of inserts
-- rather than deletes and inserts because the stream contents
-- have not been consumed yet
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
|  1 | Joe    |  90 | INSERT          | False             | 957e84b34ef0f3d957470e02bddccb027810892c |
|  2 | Jane   |  90 | INSERT          | False             | b00168a4edb9fb399dd5cc015e5f78cbea158956 |
|  3 | George |  90 | INSERT          | False             | 75206259362a7c89126b7cb039371a39d821f76a |
|  4 | Betty  |   0 | INSERT          | False             | 9b225bc2612d5e57b775feea01dd04a32ce2ad18 |
|  5 | Sally  |   0 | INSERT          | False             | 5a68f6296c975980fbbc569ce01033c192168eca |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

-- Create a table to store member details in production
CREATE OR REPLACE TABLE members_prod (
  id number(8) NOT NULL,
  name varchar(255) default NULL,
  fee number(3) NULL
);

-- Insert the first batch of stream data into the production table
INSERT INTO members_prod(id,name,fee) SELECT id, name, fee FROM member_check WHERE METADATA$ACTION = 'INSERT';

-- The stream position is advanced
select * from member_check;

+----+------+-----+-----------------+-------------------+-----------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----+-----------------+-------------------+-----------------|
+----+------+-----+-----------------+-------------------+-----------------+

-- Access and lock the stream
BEGIN;

-- Increase the fee paid by paying members
UPDATE members SET fee = fee + 15 where fee > 0;

+------------------------+-------------------------------------+
| number of rows updated | number of multi-joined rows updated |
|------------------------+-------------------------------------|
|                      3 |                                   0 |
+------------------------+-------------------------------------+

-- These changes are not visible because the change interval of the stream object starts at the current offset and ends at the current transactional time point, which is the beginning time of the transaction
SELECT * FROM member_check;

+----+------+-----+-----------------+-------------------+-----------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----+-----------------+-------------------+-----------------|
+----+------+-----+-----------------+-------------------+-----------------+

-- Commit changes
COMMIT;

-- The changes surface now because the stream object uses the current transactional time as the end point of the change interval that now includes the changes in the source table
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
|  1 | Joe    | 105 | INSERT          | True              | 123a45b67cd0e8f012345g01abcdef012345678a |
|  2 | Jane   | 105 | INSERT          | True              | 456b45b67cd1e8f123456g01ghijkl123456779b |
|  3 | George | 105 | INSERT          | True              | 567890c89de2f9g765438j20jklmn0234567890d |
|  1 | Joe    |  90 | DELETE          | True              | 123a45b67cd0e8f012345g01abcdef012345678a |
|  2 | Jane   |  90 | DELETE          | True              | 456b45b67cd1e8f123456g01ghijkl123456779b |
|  3 | George |  90 | DELETE          | True              | 567890c89de2f9g765438j20jklmn0234567890d |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

例2

次の例は、標準(デルタ)ストリームと追加専用ストリームの動作の違いを示しています。

-- Create a source table.
create or replace table t(id int, name string);

-- Create a standard stream on the source table.
create or replace  stream delta_s on table t;

-- Create an append-only stream on the source table.
create or replace  stream append_only_s on table t append_only=true;

-- Insert 3 rows into the source table.
insert into t values (0, 'charlie brown');
insert into t values (1, 'lucy');
insert into t values (2, 'linus');

-- Delete 1 of the 3 rows.
delete from t where id = '0';

-- The standard stream removes the deleted row.
select * from delta_s order by id;

+----+-------+-----------------+-------------------+------------------------------------------+
| ID | NAME  | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+-------+-----------------+-------------------+------------------------------------------|
|  1 | lucy  | INSERT          | False             | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 |
|  2 | linus | INSERT          | False             | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+-------+-----------------+-------------------+------------------------------------------+

-- The append-only stream does not remove the deleted row.
select * from append_only_s order by id;

+----+---------------+-----------------+-------------------+------------------------------------------+
| ID | NAME          | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+---------------+-----------------+-------------------+------------------------------------------|
|  0 | charlie brown | INSERT          | False             | e83abf629af50ccf94d1e78c547bfd8079e68d00 |
|  1 | lucy          | INSERT          | False             | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 |
|  2 | linus         | INSERT          | False             | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+---------------+-----------------+-------------------+------------------------------------------+

-- Create a table to store the change data capture records in each of the 3 streams.
create or replace  table t2(id int, name string, stream_type string default NULL);

-- Insert the records from the streams into the new table, advancing the offset of each stream.
insert into t2(id,name,stream_type) select id, name, 'delta stream' from delta_s;
insert into t2(id,name,stream_type) select id, name, 'append_only stream' from append_only_s;

-- Update a row in the source table.
update t set name = 'sally' where name = 'linus';

-- The standard stream records the update operation.
select * from delta_s order by id;

+----+-------+-----------------+-------------------+------------------------------------------+
| ID | NAME  | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+-------+-----------------+-------------------+------------------------------------------|
|  2 | sally | INSERT          | True              | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
|  2 | linus | DELETE          | True              | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+-------+-----------------+-------------------+------------------------------------------+

-- The append-only stream does not record the update operation.
select * from append_only_s order by id;

+----+------+-----------------+-------------------+-----------------+
| ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----------------+-------------------+-----------------|
+----+------+-----------------+-------------------+-----------------+

次の例は、ストリームを ELT (抽出、ロード、変換)プロセスで使用する方法を示しています。この例では、ステージングテーブルに挿入された新しいデータはストリームによって追跡されます。一連の SQL ステートメントは、ストリームコンテンツを変換して一連のプロダクションテーブルに挿入します。

例3

-- Create a staging table that stores raw JSON data
CREATE OR REPLACE TABLE data_staging (
  raw variant);

-- Create a stream on the staging table
CREATE OR REPLACE STREAM data_check ON TABLE data_staging;

-- Create 2 production tables to store transformed
-- JSON data in relational columns
CREATE OR REPLACE TABLE data_prod1 (
    id number(8),
    ts TIMESTAMP
    );

CREATE OR REPLACE TABLE data_prod2 (
    id number(8),
    color VARCHAR,
    num NUMBER
    );

-- Load JSON data into staging table
-- using COPY statement, Snowpipe,
-- or inserts

SELECT * FROM data_staging;

+--------------------------------------+
| RAW                                  |
|--------------------------------------|
| {                                    |
|   "id": 7077,                        |
|   "x1": "2018-08-14T20:57:01-07:00", |
|   "x2": [                            |
|     {                                |
|       "y1": "green",                 |
|       "y2": "35"                     |
|     }                                |
|   ]                                  |
| }                                    |
| {                                    |
|   "id": 7078,                        |
|   "x1": "2018-08-14T21:07:26-07:00", |
|   "x2": [                            |
|     {                                |
|       "y1": "cyan",                  |
|       "y2": "107"                    |
|     }                                |
|   ]                                  |
| }                                    |
+--------------------------------------+

--  Stream table shows inserted data
SELECT * FROM data_check;

+--------------------------------------+-----------------+-------------------+------------------------------------------+
| RAW                                  | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|--------------------------------------+-----------------+-------------------|------------------------------------------|
| {                                    | INSERT          | False             | 789012e01ef4j3k890123k35mnopqr567890124j |
|   "id": 7077,                        |                 |                   |                                          |
|   "x1": "2018-08-14T20:57:01-07:00", |                 |                   |                                          |
|   "x2": [                            |                 |                   |                                          |
|     {                                |                 |                   |                                          |
|       "y1": "green",                 |                 |                   |                                          |
|       "y2": "35"                     |                 |                   |                                          |
|     }                                |                 |                   |                                          |
|   ]                                  |                 |                   |                                          |
| }                                    |                 |                   |                                          |
| {                                    | INSERT          | False             | 765432u89tk3l6y456789012rst7vx678912456k |
|   "id": 7078,                        |                 |                   |                                          |
|   "x1": "2018-08-14T21:07:26-07:00", |                 |                   |                                          |
|   "x2": [                            |                 |                   |                                          |
|     {                                |                 |                   |                                          |
|       "y1": "cyan",                  |                 |                   |                                          |
|       "y2": "107"                    |                 |                   |                                          |
|     }                                |                 |                   |                                          |
|   ]                                  |                 |                   |                                          |
| }                                    |                 |                   |                                          |
+--------------------------------------+-----------------+-------------------+------------------------------------------+

-- Access and lock the stream
BEGIN;

-- Transform and copy JSON elements into relational columns
-- in the production tables
INSERT INTO data_prod1 (id, ts)
SELECT t.raw:id, to_timestamp (t.raw:x1)
FROM data_check t
WHERE METADATA$ACTION = 'INSERT';

INSERT INTO data_prod2 (id, color, num)
SELECT t.raw:id, f.value:y1, f.value:y2
FROM data_check t
, lateral flatten(input => raw:x2) f
WHERE METADATA$ACTION = 'INSERT';

-- Commit changes in the stream objects participating in the transaction
COMMIT;

SELECT * FROM data_prod1;

+------+-------------------------+
|   ID | TS                      |
|------+-------------------------|
| 7077 | 2018-08-14 20:57:01.000 |
| 7078 | 2018-08-14 21:07:26.000 |
+------+-------------------------+

SELECT * FROM data_prod2;

+------+-------+-----+
|   ID | COLOR | NUM |
|------+-------+-----|
| 7077 | green |  35 |
| 7078 | cyan  | 107 |
+------+-------+-----+

SELECT * FROM data_check;

+-----+-----------------+-------------------+
| RAW | METADATA$ACTION | METADATA$ISUPDATE |
|-----+-----------------+-------------------|
+-----+-----------------+-------------------+