テーブルストリームを使用した変更追跡

ストリームオブジェクトは、挿入、更新、削除などのテーブルに加えられたデータ操作言語(DML)の変更、および各変更に関するメタデータを記録し、変更されたデータを使用してアクションを実行できるようにします。このプロセスは、変更データキャプチャ(CDC)と呼ばれます。個々のテーブルストリームは、 ソーステーブル の行に加えられた変更を追跡します。テーブルストリーム(単に「ストリーム」とも呼ばれます)は、テーブル内の2つのトランザクションポイント間で行レベルで変更された内容の「変更テーブル」を利用可能にします。これにより、トランザクション形式で一連の変更記録をクエリおよび使用できます。

ストリームは、標準(ローカル)テーブル、 外部テーブル、および ディレクトリテーブル でサポートされています。

このトピックの内容:

テーブルストリームの概要

テーブルストリームは、作成されると、特定の時点(オフセット)をテーブルの現在のトランザクションバージョンとして初期化することにより、ソーステーブル内のすべての行の初期スナップショットを論理的に取得します。ストリームで使用される変更追跡システムは、このスナップショットの取得後にコミットされた DML 変更に関する情報を記録します。変更レコードは、変更前後の行の状態を提供します。変更情報は、追跡されるソーステーブルの列構造を反映し、各変更イベントを説明する追加のメタデータ列を含みます。

ストリーム自体には、テーブルデータが 含まれない ことに注意してください。ストリームは、ソーステーブルのオフセットのみを格納し、ソーステーブルのバージョン管理履歴を活用して CDC レコードを返します。テーブルの最初のストリームが作成されると、非表示の列のペアがソーステーブルに追加され、変更追跡メタデータの保存が開始されます。これらの列は少量のストレージを消費します。ストリームのクエリ時に返される CDC レコードは、ストリームに保存されている オフセット と、テーブルに保存されている 変更追跡メタデータ の組み合わせに依存します。

ストリームをブックマークと考えると便利な場合があります。ブックマークは、ブックのページ(つまり、ソーステーブル)の特定の時点を示します。ブックマークは破棄でき、他のブックマークをブックのさまざまな場所に挿入できます。同様に、ストリームをドロップし、同じまたは異なる時点で他のストリームを作成することができ(異なる時間に連続してストリームを作成するか、 Time Travel を使用)、同じまたは異なるオフセットでテーブルの変更レコードを消費します。

CDC レコードのコンシューマーの一例は、 データパイプライン です。このパイプラインでは、最後の抽出以降に変更されたステージングテーブルのデータのみが変換され、他のテーブルにコピーされます。

現在、ストリームはマテリアライズドビューの変更を追跡できません。

テーブルのバージョン管理

1つ以上の DML ステートメントを含むトランザクションがテーブルにコミットされるたびに、新しいテーブルバージョンが作成されます。テーブルのトランザクション履歴では、ストリームオフセットは2つのテーブルバージョンの間にあります。ストリームをクエリすると、オフセット後および現在の時刻以前にコミットされたトランザクションによって発生した変更が返されます。

次の例は、タイムラインに10のコミットされたバージョンを持つソーステーブルを示しています。ストリーム s1 のオフセットは、現在、テーブルバージョン v3v4 の間です。ストリームがクエリ(または消費)されると、返されるレコードには、テーブルタイムラインのストリームオフセットの直後のバージョンであるテーブルバージョン v4 から、タイムラインの最新のコミット済みテーブルバージョンである v10 までのすべてのトランザクションが含まれます。

Stream offset example

ストリームは、現在のオフセットから現在のバージョンのテーブルへの最小限の変更セットを提供します。

複数のクエリは、オフセットを変更することなく、ストリームから同じ変更データを独立して使用できます。ストリームは、DML トランザクションで使用される場合に のみ オフセットを進めます。この動作は、明示的トランザクションと 自動コミット トランザクションの両方に適用されます。(デフォルトでは、DML ステートメントが実行されると、自動コミットトランザクションが暗黙的に開始され、ステートメントの完了時にトランザクションがコミットされます。この動作は AUTOCOMMIT パラメーターで制御されます)。ストリームをクエリするだけでは、明示的なトランザクション内であっても、そのオフセットは進みません。ストリームの内容は、DML ステートメントで消費する必要があります。

SQL ステートメントが明示的なトランザクション内でストリームをクエリすると、ストリームはステートメントが実行されたときではなく、トランザクションが開始されたときのストリームアドバンスポイント(つまり、タイムスタンプ)でクエリされます。この動作は、DML ステートメントと、既存のストリームの行を新しいテーブルに取り込む CREATE TABLE ... AS SELECT (CTAS)ステートメントの両方に関係します。

ストリームから選択する DML ステートメントは、トランザクションが正常にコミットされる限り、ストリーム内のすべての変更データを消費します。複数のステートメントがストリーム内の同じ変更レコードにアクセスするようにするには、それらを明示的なトランザクションステートメント(BEGIN .. COMMIT)で囲みます。これにより、ストリームがロックされます。DML は並列トランザクションでのソーステーブルの更新は、変更追跡システムによって追跡されますが、明示的なトランザクションステートメントがコミットされ、既存の変更データが消費されるまでストリームを更新しません。

反復可能な読み取り分離

ストリームは反復可能な読み取り分離をサポートします。反復可能読み取りモードでは、トランザクション内の複数の SQL ステートメントがストリーム内の同じレコードセットを参照します。これは、テーブルでサポートされている読み取りコミットモードとは異なります。このモードでは、ステートメントは同じトランザクション内で実行された以前のステートメントによって行われた変更を、これらの変更がまだコミットされていない場合でも、確認します。

トランザクションでストリームによって返されるデルタレコードは、ストリームの現在位置からトランザクション開始時間までの範囲です。トランザクションがコミットされると、ストリーム位置はトランザクション開始時間まで進みます。それ以外の場合は、同じ位置に留まります。

次の例を考えてみましょう:

時間

トランザクション1

トランザクション2

1

トランザクションを開始します。

2

テーブル t1 のクエリストリーム s1 。ストリームは、現在の位置からトランザクション1の開始時刻までの変更データキャプチャレコード . を返します。ストリームが DML ステートメント . で使用されている場合、同時トランザクションによる変更を回避するためにストリームがロックされます。

3

テーブル t1 の行を更新します。

4

クエリストリーム s1Time 2 に使用されたときと同じストリームの状態を返します。

5

トランザクションをコミットします。ストリームがトランザクション内の DML ステートメントで消費された場合、ストリーム位置はトランザクション開始時間まで進みます。

6

トランザクションを開始します。

7

クエリストリーム s1。結果には、トランザクション1によってコミットされたテーブルの変更が含まれます。

トランザクション1内で、ストリーム s1 へのすべてのクエリは同じレコードセットを参照します。テーブル t1 へのDML 変更は、トランザクションがコミットされたときにのみストリームに記録されます。

トランザクション2で、ストリームへのクエリは、トランザクション1でテーブルに記録された変更を確認します。トランザクション1がコミットされる にトランザクション2が開始した場合、ストリームへのクエリは、ストリームの位置からトランザクション2の開始時刻までのストリームのスナップショットを返します。トランザクション1によってコミットされた変更は表示されません。

ストリーム列

ストリームには、実際のテーブルの列やデータではなく、ソーステーブルのオフセットが保存されます。クエリを実行すると、ストリームは履歴データにアクセスし、ソーステーブルと同じ形状(つまり、同じ列名と順序)で履歴データを返しますが、次の追加の列があります。

METADATA$ACTION

記録された DML 操作(INSERT、 DELETE)を示します。

METADATA$ISUPDATE

操作が UPDATE ステートメントの一部であったかどうかを示します。ソーステーブルの行の更新は、ストリーム内の DELETE および INSERT レコードのペアとして表され、メタデータ列の METADATA$ISUPDATE 値は TRUEに設定されます。

ストリームは2つのオフセットの違いを記録することに注意してください。行が追加され、現在のオフセットで更新された場合、デルタの変更は新しい行になります。 METADATA$ISUPDATE 行には FALSE 値が記録されます。

METADATA$ROW_ID

行の一意かつ不変の ID を指定します。これは、特定の行への変更を経時的に追跡するために使用できます。

ストリームのタイプ

次のストリーム型は、それぞれによって記録されたメタデータに基づいて使用できます。

Standard

標準(つまり、デルタ)テーブルストリームは、挿入、更新、削除(テーブルの切り捨てを含む)を含む、ソーステーブルに対するすべての DML の変更を追跡します。このストリーム型は、変更セットで挿入および削除された行で結合を実行して、行レベルのデルタを提供します。たとえば、実質的な効果として、テーブル内の2つのトランザクションポイントの間に挿入されてから削除された行は、デルタで削除されます(つまり、ストリームのクエリ時に返されない)。

追加のみ

追加専用のテーブルストリームは、行の挿入のみを追跡します。更新および削除操作(テーブルの切り捨てを含む)は記録されません。たとえば、10行がテーブルに挿入され、追加専用ストリームのオフセットが進む前にそれらの行の5が削除された場合、ストリームは10行を記録します。

追加専用ストリームは追加された行のみを返すため、抽出、ロード、変換(ELT)、および行の挿入のみに依存する類似のシナリオのパフォーマンスが、標準ストリームの場合よりもはるかに向上します。例えば、追加専用ストリームの行が消費された直後にソーステーブルを切り捨てることができ、レコードの削除は、次にストリームがクエリまたは消費されるときにオーバーヘッドに寄与しません。

挿入のみ

外部テーブルでのみサポートされます。 挿入のみのストリームは、行の挿入のみを追跡します。挿入されたセットから行を削除する削除操作(つまり、何もしない)は記録されません。例えば、任意の2つのオフセットの間で、外部テーブルによって参照されるクラウドストレージの場所からFile1が削除され、File2が追加された場合、ストリームはFile2の行のレコードのみを返します。標準テーブルの CDC データを追跡する場合とは異なり、Snowflakeはクラウドストレージ内のファイルの履歴記録にアクセスできません。

上書きまたは追加されたファイルは、基本的に新しいファイルとして処理されます。古いバージョンのファイルはクラウドストレージから削除されますが、挿入専用ストリームは削除操作を記録しません。ファイルの新しいバージョンがクラウドストレージに追加され、挿入専用ストリームは行を挿入として記録します。ストリームは、古いファイルバージョンと新しいファイルバージョンの差分を記録しません。 Azure AppendBlobs を使用している場合など、追加によってテーブルの自動更新がトリガーされない場合があることに注意してください。

データフロー

次の図は、ソーステーブルの行が更新されるときに標準テーブルストリームの内容がどのように変化するかを示しています。 DML ステートメントがストリームコンテンツを消費するたびに、ストリーム位置が進み、テーブルに対する次の DML の変更(テーブルバージョンの変更)を追跡します。

Streams Example

データ保持期間と陳腐化

ストリームのオフセットが、ソーステーブルのデータ保持期間の外にある場合、ストリームは古くなっています。ストリームが古くなると、未使用の変更レコードを含め、ソーステーブルの履歴データにアクセスできなくなります。テーブルの 新しい 変更レコードを追跡するには、ストリームを再作成します( CREATE STREAM を使用)。ストリームが古くなるのを防ぐには、テーブルの保持期間中にトランザクション内でストリームレコードを消費します。データ保持期間の詳細については、 Time Travelの理解と使用 をご参照ください。

ソーステーブルのデータ保持期間が 14日未満 であり、ストリームが消費されていない場合、Snowflakeはこの期間を一時的に延長して、古くならないようにします。アカウントの Snowflake Edition に関係なく、期間はストリームのオフセットまで、デフォルトで最大14日まで延長されます。Snowflakeがデータ保持期間を延長できる最大日数は、 MAX_DATA_EXTENSION_TIME_IN_DAYS パラメータ値によって決定されます。ストリームが消費されると、データ保持期間の延長はテーブルのデフォルト期間に短縮されます。

次の表は、DATA_RETENTION_TIME_IN_DAYSとMAX_DATA_EXTENSION_TIME_IN_DAYSの値の例を示し、古くなることを回避するためにストリームコンテンツを消費する頻度を示しています。

DATA_RETENTION_TIME_IN_DAYS

MAX_DATA_EXTENSION_TIME_IN_DAYS

X日でストリームを消費する

14

0

14

1

14

14

0

90

90

ストリームが古くなったかどうかを判断するには、 DESCRIBE STREAM または SHOW STREAMS コマンドを実行します。コマンド出力で、 STALE 列の値が TRUE の場合、ストリームが古くなっている可能性があります。実際には、ストリームからの読み取りは、予想される STALE_TIME の後しばらくの間成功する可能性があります。ただし、この期間中はいつでもストリームが古くなる可能性があります。

重要

  • ソーステーブルを再作成すると(CREATE OR REPLACE TABLE 構文を使用)、その履歴はドロップされ、テーブル上のストリームも古くなります。

  • 現在、ソーステーブルとストリームを含むデータベースまたはスキーマのクローンが作成されると、ストリームクローン内の未使用の記録にアクセスできなくなります。この動作は、テーブルの Time Travel と一致しています。テーブルのクローンが作成される場合、テーブルクローンの履歴データは、クローンが作成された時間/時点で開始されます。

  • ソーステーブルの名前を変更しても、ストリームが壊れたり、古くなったりすることはありません。さらに、テーブルがドロップされ、同じ名前で新しいテーブルが作成された場合、元のテーブルにリンクされているストリームは、新しいテーブルにリンク されません

ストリームの複数のコンシューマー

ユーザーには、テーブルの変更レコードのコンシューマーごとに個別のストリームを作成することをお勧めします。「コンシューマー」とは、 DML トランザクションを使用してテーブル変更レコードを使用するためのタスク、スクリプト、またはその他のメカニズムのことです。このトピックで前述したように、ストリームは DML トランザクションで使用されるとオフセットを進めます。Time Travelが使用されていない限り、単一ストリーム内の変更データのさまざまなコンシューマーはさまざまなデルタを取得します。DML トランザクションを使用して、ストリーム内の最新のオフセットからキャプチャされた変更データが使用されると、ストリームはオフセットを進めます。次のコンシューマーは、変更データを使用できなくなります。テーブルの 同じ 変更データを使用するには、テーブルに複数のストリームを作成します。ストリームはソーステーブルのオフセットのみを保存し、実際のテーブルデータは保存 しません 。したがって、大きなコストをかけずに、テーブルに任意の数のストリームを作成できます。

CHANGES 句:ストリームに対する読み取り専用の代替

ストリームの代わりに、Snowflakeは SELECT ステートメントの CHANGES 句を使用して、テーブルの変更追跡メタデータのクエリをサポートします。 CHANGES 句を使用すると、明示的なトランザクションオフセットを使用してテーブルストリームを作成しなくても、2つの時点間で変更追跡メタデータをクエリできます。 CHANGES 句を使用しても、オフセットは前倒し されません (つまり、レコードを使用)。複数のクエリにより、異なるトランザクションの開始と終了の間で変更追跡メタデータを取得できます。このオプションでは、 AT | BEFORE 句を使用してメタデータのトランザクションの開始点を指定する必要があります。変更追跡間隔のエンドポイントは、オプションの END 句を使用して設定できます。

ストリームは、テーブルの現在のトランザクションバージョンを格納し、ほとんどのシナリオで CDC レコードの適切なソースです。任意の期間のオフセットを管理する必要があるまれなシナリオでは、 CHANGES 句を使用できます。

注釈

現在、テーブルの変更追跡メタデータが記録される前に、次の いずれか がtrueである必要があります。

  • テーブルで変更追跡が有効になります( ALTER TABLE ... CHANGE_TRACKING = TRUE を使用)。

  • テーブルのストリームが作成されます( CREATE STREAM を使用)。

どちらのオプションでも、非表示の列のペアがテーブルに追加され、変更追跡メタデータの保存が開始されます。これら非表示の CDC データ列の値は、ストリーム メタデータ列 への入力を提供します。列は少量のストレージを消費します。

これらの条件のいずれかが満たされる前の期間では、テーブルの変更追跡メタデータは使用できません。

共有テーブルのストリーム

共有テーブルにストリームを作成すると、データコンシューマーはそれらのテーブルで行われたデータ操作言語( DML )の変更を追跡できます。この機能は、「ローカル」テーブル(つまり、ストリームと同じアカウント)でのストリームの作成と使用に似ています。このトピックでは、データプロバイダーが共有テーブルを構成してストリームを作成し、コンシューマーがストリームを作成する手順について説明します。

手順については、次をご参照ください。

データプロバイダー

共有の操作

データコンシューマー

データコンシューマー

ストリームの請求

このトピックの データ保持期間と陳腐化 で説明したように、ストリームが定期的に消費されない場合、Snowflakeはソーステーブルのデータ保持期間を 一時的に 延長します。テーブルのデータ保持期間が14日未満の場合、アカウントの Snowflakeエディション に関係なく、期間はストリームトランザクションオフセット より短く 、または14日(テーブルのデータ保持期間が14日未満の場合)に、バックグラウンドで延長されます。

拡張データ保持期間には追加のストレージが必要であり、これは毎月のストレージ料金に反映されます。

ストリームに関連する主なコストは、仮想ウェアハウスがストリームをクエリするために使用する処理時間です。これらの料金は、なじみのあるSnowflakeクレジットとして請求書に表示されます。

ストリーム DDL

ストリームの作成と管理をサポートするために、Snowflakeは次の一連の特別な DDL コマンドを提供します:

さらに、プロバイダーは、次の標準アクセス制御 DDLを使用して、 ELT に必要なデータベースオブジェクトへのアクセスを表示、許可、または取り消すことができます:

必要なアクセス権限

ストリームを作成および管理するには、少なくとも次のロール権限を持つロールが必要です:

オブジェクト

権限

注意

データベース

USAGE

スキーマ

USAGE, CREATE STREAM

テーブル

SELECT

注釈

ソーステーブルで変更の追跡が有効になっていない場合(ALTER TABLE ... SET CHANGE_TRACKING = TRUE を使用)は、テーブルの所有者(つまり、テーブルに対する OWNERSHIP 権限を持つロール)のみが、テーブルの最初のストリームを作成できます。最初のストリームを作成すると、テーブルでの変更の追跡が自動で有効になります。

ストリームのクエリには、少なくとも次のロール権限を持つロールが必要です。

オブジェクト

権限

注意

データベース

USAGE

スキーマ

USAGE

ストリーム

SELECT

テーブル

SELECT

例1

次の例は、ソーステーブルで DML ステートメントが実行されると、ストリームの内容がどのように変化するかを示しています。

-- Create a table to store the names and fees paid by members of a gym
CREATE OR REPLACE TABLE members (
  id number(8) NOT NULL,
  name varchar(255) default NULL,
  fee number(3) NULL
);

-- Create a stream to track changes to date in the MEMBERS table
CREATE OR REPLACE STREAM member_check ON TABLE members;

-- Create a table to store the dates when gym members joined
CREATE OR REPLACE TABLE signup (
  id number(8),
  dt DATE
  );

INSERT INTO members (id,name,fee)
VALUES
(1,'Joe',0),
(2,'Jane',0),
(3,'George',0),
(4,'Betty',0),
(5,'Sally',0);

INSERT INTO signup
VALUES
(1,'2018-01-01'),
(2,'2018-02-15'),
(3,'2018-05-01'),
(4,'2018-07-16'),
(5,'2018-08-21');

-- The stream records the inserted rows
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
|  1 | Joe    |   0 | INSERT          | False             | d200504bf3049a7d515214408d9a804fd03b46cd |
|  2 | Jane   |   0 | INSERT          | False             | d0a551cecbee0f9ad2b8a9e81bcc33b15a525a1e |
|  3 | George |   0 | INSERT          | False             | b98ad609fffdd6f00369485a896c52ca93b92b1f |
|  4 | Betty  |   0 | INSERT          | False             | e554e6e68293a51d8e69d68e9b6be991453cc901 |
|  5 | Sally  |   0 | INSERT          | False             | c94366cf8a4270cf299b049af68a04401c13976d |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

-- Apply a $90 fee to members who joined the gym after a free trial period ended:
MERGE INTO members m
  USING (
    SELECT id, dt
    FROM signup s
    WHERE DATEDIFF(day, '2018-08-15'::date, s.dt::DATE) < -30) s
    ON m.id = s.id
  WHEN MATCHED THEN UPDATE SET m.fee = 90;

SELECT * FROM members;

+----+--------+-----+
| ID | NAME   | FEE |
|----+--------+-----|
|  1 | Joe    |  90 |
|  2 | Jane   |  90 |
|  3 | George |  90 |
|  4 | Betty  |   0 |
|  5 | Sally  |   0 |
+----+--------+-----+

-- The stream records the updated FEE column as a set of inserts
-- rather than deletes and inserts because the stream contents
-- have not been consumed yet
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
|  1 | Joe    |  90 | INSERT          | False             | 957e84b34ef0f3d957470e02bddccb027810892c |
|  2 | Jane   |  90 | INSERT          | False             | b00168a4edb9fb399dd5cc015e5f78cbea158956 |
|  3 | George |  90 | INSERT          | False             | 75206259362a7c89126b7cb039371a39d821f76a |
|  4 | Betty  |   0 | INSERT          | False             | 9b225bc2612d5e57b775feea01dd04a32ce2ad18 |
|  5 | Sally  |   0 | INSERT          | False             | 5a68f6296c975980fbbc569ce01033c192168eca |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

-- Create a table to store member details in production
CREATE OR REPLACE TABLE members_prod (
  id number(8) NOT NULL,
  name varchar(255) default NULL,
  fee number(3) NULL
);

-- Insert the first batch of stream data into the production table
INSERT INTO members_prod(id,name,fee) SELECT id, name, fee FROM member_check WHERE METADATA$ACTION = 'INSERT';

-- The stream position is advanced
select * from member_check;

+----+------+-----+-----------------+-------------------+-----------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----+-----------------+-------------------+-----------------|
+----+------+-----+-----------------+-------------------+-----------------+

-- Access and lock the stream
BEGIN;

-- Increase the fee paid by paying members
UPDATE members SET fee = fee + 15 where fee > 0;

+------------------------+-------------------------------------+
| number of rows updated | number of multi-joined rows updated |
|------------------------+-------------------------------------|
|                      3 |                                   0 |
+------------------------+-------------------------------------+

-- These changes are not visible because the change interval of the stream object starts at the current offset and ends at the current
-- transactional time point, which is the beginning time of the transaction
SELECT * FROM member_check;

+----+------+-----+-----------------+-------------------+-----------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----+-----------------+-------------------+-----------------|
+----+------+-----+-----------------+-------------------+-----------------+

-- Commit changes
COMMIT;

-- The changes surface now because the stream object uses the current transactional time as the end point of the change interval that now
-- includes the changes in the source table
SELECT * FROM member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME   | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
|  1 | Joe    | 105 | INSERT          | True              | 123a45b67cd0e8f012345g01abcdef012345678a |
|  2 | Jane   | 105 | INSERT          | True              | 456b45b67cd1e8f123456g01ghijkl123456779b |
|  3 | George | 105 | INSERT          | True              | 567890c89de2f9g765438j20jklmn0234567890d |
|  1 | Joe    |  90 | DELETE          | True              | 123a45b67cd0e8f012345g01abcdef012345678a |
|  2 | Jane   |  90 | DELETE          | True              | 456b45b67cd1e8f123456g01ghijkl123456779b |
|  3 | George |  90 | DELETE          | True              | 567890c89de2f9g765438j20jklmn0234567890d |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

例2

次の例は、標準(デルタ)ストリームと追加専用ストリームの動作の違いを示しています。

-- Create a source table.
create or replace table t(id int, name string);

-- Create a standard stream on the source table.
create or replace  stream delta_s on table t;

-- Create an append-only stream on the source table.
create or replace  stream append_only_s on table t append_only=true;

-- Insert 3 rows into the source table.
insert into t values (0, 'charlie brown');
insert into t values (1, 'lucy');
insert into t values (2, 'linus');

-- Delete 1 of the 3 rows.
delete from t where id = '0';

-- The standard stream removes the deleted row.
select * from delta_s order by id;

+----+-------+-----------------+-------------------+------------------------------------------+
| ID | NAME  | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+-------+-----------------+-------------------+------------------------------------------|
|  1 | lucy  | INSERT          | False             | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 |
|  2 | linus | INSERT          | False             | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+-------+-----------------+-------------------+------------------------------------------+

-- The append-only stream does not remove the deleted row.
select * from append_only_s order by id;

+----+---------------+-----------------+-------------------+------------------------------------------+
| ID | NAME          | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+---------------+-----------------+-------------------+------------------------------------------|
|  0 | charlie brown | INSERT          | False             | e83abf629af50ccf94d1e78c547bfd8079e68d00 |
|  1 | lucy          | INSERT          | False             | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 |
|  2 | linus         | INSERT          | False             | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+---------------+-----------------+-------------------+------------------------------------------+

-- Create a table to store the change data capture records in each of the streams.
create or replace  table t2(id int, name string, stream_type string default NULL);

-- Insert the records from the streams into the new table, advancing the offset of each stream.
insert into t2(id,name,stream_type) select id, name, 'delta stream' from delta_s;
insert into t2(id,name,stream_type) select id, name, 'append_only stream' from append_only_s;

-- Update a row in the source table.
update t set name = 'sally' where name = 'linus';

-- The standard stream records the update operation.
select * from delta_s order by id;

+----+-------+-----------------+-------------------+------------------------------------------+
| ID | NAME  | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+-------+-----------------+-------------------+------------------------------------------|
|  2 | sally | INSERT          | True              | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
|  2 | linus | DELETE          | True              | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 |
+----+-------+-----------------+-------------------+------------------------------------------+

-- The append-only stream does not record the update operation.
select * from append_only_s order by id;

+----+------+-----------------+-------------------+-----------------+
| ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+------+-----------------+-------------------+-----------------|
+----+------+-----------------+-------------------+-----------------+

次の例は、ストリームを ELT (抽出、ロード、変換)プロセスで使用する方法を示しています。この例では、ステージングテーブルに挿入された新しいデータはストリームによって追跡されます。一連の SQL ステートメントは、ストリームコンテンツを変換して一連のプロダクションテーブルに挿入します。

例3

-- Create a staging table that stores raw JSON data
CREATE OR REPLACE TABLE data_staging (
  raw variant);

-- Create a stream on the staging table
CREATE OR REPLACE STREAM data_check ON TABLE data_staging;

-- Create 2 production tables to store transformed
-- JSON data in relational columns
CREATE OR REPLACE TABLE data_prod1 (
    id number(8),
    ts TIMESTAMP_TZ
    );

CREATE OR REPLACE TABLE data_prod2 (
    id number(8),
    color VARCHAR,
    num NUMBER
    );

-- Load JSON data into staging table
-- using COPY statement, Snowpipe,
-- or inserts

SELECT * FROM data_staging;

+--------------------------------------+
| RAW                                  |
|--------------------------------------|
| {                                    |
|   "id": 7077,                        |
|   "x1": "2018-08-14T20:57:01-07:00", |
|   "x2": [                            |
|     {                                |
|       "y1": "green",                 |
|       "y2": "35"                     |
|     }                                |
|   ]                                  |
| }                                    |
| {                                    |
|   "id": 7078,                        |
|   "x1": "2018-08-14T21:07:26-07:00", |
|   "x2": [                            |
|     {                                |
|       "y1": "cyan",                  |
|       "y2": "107"                    |
|     }                                |
|   ]                                  |
| }                                    |
+--------------------------------------+

--  Stream table shows inserted data
SELECT * FROM data_check;

+--------------------------------------+-----------------+-------------------+------------------------------------------+
| RAW                                  | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|--------------------------------------+-----------------+-------------------|------------------------------------------|
| {                                    | INSERT          | False             | 789012e01ef4j3k890123k35mnopqr567890124j |
|   "id": 7077,                        |                 |                   |                                          |
|   "x1": "2018-08-14T20:57:01-07:00", |                 |                   |                                          |
|   "x2": [                            |                 |                   |                                          |
|     {                                |                 |                   |                                          |
|       "y1": "green",                 |                 |                   |                                          |
|       "y2": "35"                     |                 |                   |                                          |
|     }                                |                 |                   |                                          |
|   ]                                  |                 |                   |                                          |
| }                                    |                 |                   |                                          |
| {                                    | INSERT          | False             | 765432u89tk3l6y456789012rst7vx678912456k |
|   "id": 7078,                        |                 |                   |                                          |
|   "x1": "2018-08-14T21:07:26-07:00", |                 |                   |                                          |
|   "x2": [                            |                 |                   |                                          |
|     {                                |                 |                   |                                          |
|       "y1": "cyan",                  |                 |                   |                                          |
|       "y2": "107"                    |                 |                   |                                          |
|     }                                |                 |                   |                                          |
|   ]                                  |                 |                   |                                          |
| }                                    |                 |                   |                                          |
+--------------------------------------+-----------------+-------------------+------------------------------------------+

-- Access and lock the stream
BEGIN;

-- Transform and copy JSON elements into relational columns
-- in the production tables
INSERT INTO data_prod1 (id, ts)
SELECT t.raw:id, to_timestamp_tz(t.raw:x1)
FROM data_check t
WHERE METADATA$ACTION = 'INSERT';

INSERT INTO data_prod2 (id, color, num)
SELECT t.raw:id, f.value:y1, f.value:y2
FROM data_check t
, lateral flatten(input => raw:x2) f
WHERE METADATA$ACTION = 'INSERT';

-- Commit changes in the stream objects participating in the transaction
COMMIT;

SELECT * FROM data_prod1;

+------+---------------------------+
|   ID | TS                        |
|------+---------------------------|
| 7077 | 2018-08-14 20:57:01 -0700 |
| 7078 | 2018-08-14 21:07:26 -0700 |
+------+---------------------------+

SELECT * FROM data_prod2;

+------+-------+-----+
|   ID | COLOR | NUM |
|------+-------+-----|
| 7077 | green |  35 |
| 7078 | cyan  | 107 |
+------+-------+-----+

SELECT * FROM data_check;

+-----+-----------------+-------------------+
| RAW | METADATA$ACTION | METADATA$ISUPDATE |
|-----+-----------------+-------------------|
+-----+-----------------+-------------------+