カテゴリ:

データパイプライン DDL

CREATE STREAM

現在/指定されたスキーマで新しいストリームを作成するか、既存のストリームを置き換えます。ストリームは、挿入、更新、削除に関する情報など、テーブルに加えられたデータ操作言語(DML)の変更を記録します。変更が記録されるテーブルは、 ソーステーブル と呼ばれます。

さらに、このコマンドは次のバリアントをサポートしています。

  • CREATE STREAM ... CLONE (既存のストリームのクローンを作成)

こちらもご参照ください。

ALTER STREAMDROP STREAMSHOW STREAMS

このトピックの内容:

構文

-- Table stream
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
  <name>
  [ COPY GRANTS ]
  ON TABLE <table_name>
  [ { AT | BEFORE } { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> } ]
  [ APPEND_ONLY = TRUE | FALSE ]
  [ COMMENT = '<string_literal>' ]

-- External table stream
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
  <name>
  [ COPY GRANTS ]
  ON EXTERNAL TABLE <external_table_name>
  [ { AT | BEFORE } { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> } ]
  [ INSERT_ONLY = TRUE ]
  [ COMMENT = '<string_literal>' ]

バリアント構文

CREATE STREAM ... CLONE

ソースストリームと同じ定義で新しいストリームを作成します。クローンは、ソースストリームから現在の オフセット (つまり、テーブルの現在のトランザクションバージョン)を継承します。

CREATE [ OR REPLACE ] STREAM <name> CLONE <source_stream>
  [ COPY GRANTS ]
  [ ... ]

クローン作成の詳細については、 CREATE <オブジェクト> ... CLONE をご参照ください。

必須パラメーター

名前

ストリームの識別子(つまり、名前)を指定する文字列。ストリームが作成されるスキーマに対して一意である必要があります。

また、識別子はアルファベット文字で始まる必要があり、識別子文字列全体が二重引用符で囲まれていない限り、スペースや特殊文字を含めることはできません(例: "My object")。二重引用符で囲まれた識別子も大文字と小文字が区別されます。

詳細については、 識別子の要件 をご参照ください。

テーブル名

ストリームによって変更が追跡されるテーブル(つまり、ソーステーブル)の識別子(つまり、名前)を指定する文字列。

アクセス制御

ストリームにクエリを実行するには、ロールに基になるテーブルに対する SELECT 権限が必要です。

外部テーブル名

ストリームによって変更が追跡される外部テーブル(つまり、ソース外部テーブル)の識別子(つまり、名前)を指定する文字列。

アクセス制御

ストリームにクエリを実行するには、ロールに基になる外部テーブルに対する SELECT 権限が必要です。

オプションのパラメーター

COPY GRANTS

次の CREATE STREAM バリアントのいずれかを使用して新しいストリームが作成されるときに、元のストリームからのアクセス許可を保持することを指定します。

  • CREATE OR REPLACE STREAM

  • CREATE STREAM ... CLONE

このパラメーターは、 OWNERSHIP を 除く すべての許可を既存のストリームから新しいストリームにコピーします。デフォルトでは、 CREATE STREAM コマンドを実行するロールが新しいストリームを所有します。

注釈

  • CREATE STREAM ステートメントが複数のストリーム(create or replace stream t1 clone t2; など)を参照する場合、 COPY GRANTS 句は置換されるストリームを優先します。

  • 置換ストリームの SHOW GRANTS 出力には、コピーされた権限の被付与者が、ステートメントが実行されたときの現在のタイムスタンプとともに、 CREATE STREAM ステートメントを実行したロールとしてリストされます。

  • 許可をコピーする操作は、 CREATE STREAM コマンドで(つまり、同じトランザクション内で)アトミックに発生します。

注釈

このパラメーターは現在サポートされていません。

AT | BEFORE TIMESTAMP => <タイムスタンプ> | OFFSET => <time_difference> | STATEMENT => <ID>

過去の特定の時間/ポイントでテーブルにストリームを作成します( Time Travel を使用)。 AT | BEFORE 句は、テーブルの履歴データが要求される過去のポイントを決定します。

  • AT キーワードは、指定されたパラメーターに等しいタイムスタンプを持つステートメントまたはトランザクションによる変更が、要求に含まれることを指定します。

  • BEFORE キーワードは、リクエストが指定されたパラメーターの直前のポイントを参照するように指定します。

注釈

現時点では、テーブルの変更追跡情報を記録する前に、テーブルにストリームを作成する必要があります。 AT | BEFORE 句で指定された過去の時点でテーブルにストリームが存在しなかった場合、 CREATE STREAM ステートメントは失敗します。変更追跡が記録される前の過去に、ストリームを作成することはできません。

APPEND_ONLY = TRUE | FALSE

これが追加専用ストリームかどうかを指定します。追加専用ストリームは、行の挿入のみを追跡します。更新および削除操作(テーブルの切り捨てを含む)は記録されません。例えば、10行がテーブルに挿入され、追加専用ストリームのオフセットが進む前にそれらの行の5が削除された場合、ストリームは10行を記録します。

このタイプのストリームは、標準ストリームよりもクエリのパフォーマンスを向上させ、抽出、ロード、変換(ELT)および行挿入のみに依存する同様のシナリオに非常に役立ちます。

標準ストリームは、変更セット内の削除された行と挿入された行を結合して、削除された行と更新された行を判別します。追加専用ストリームは追加された行のみを返すため、標準ストリームよりもはるかにパフォーマンスが向上します。例えば、追加専用ストリームの行が消費された直後にソーステーブルを切り捨てることができ、レコードの削除は、次にストリームがクエリまたは消費されるときにオーバーヘッドに寄与しません。

デフォルト

FALSE

INSERT_ONLY = TRUE | FALSE

これが挿入専用ストリームかどうかを指定します。挿入のみのストリームは、行の挿入のみを追跡します。挿入されたセットから行を削除する削除操作(つまり、何もしない)は記録されません。例えば、任意の2つのオフセットの間で、外部テーブルによって参照されるクラウドストレージの場所からFile1が削除され、File2が追加された場合、ストリームはFile2の行のレコードのみを返します。標準テーブルの CDC データを追跡する場合とは異なり、Snowflakeはクラウドストレージ内のファイルの履歴レコードにアクセスできません。

注釈

挿入のみのテーブルストリームのサポートは、 プレビュー機能 として提供されます。

デフォルト

FALSE

COMMENT = ' 文字列リテラル '

テーブルのコメントを指定する文字列(リテラル)。

デフォルト:値なし

出力

ストリームの出力には、ソーステーブルと同じ列と、次の追加の列が含まれます。

  • METADATA$ACTION:アクションを指定します(INSERT または DELETE)。

  • METADATA$ISUPDATE:記録されたアクション(INSERT または DELETE)がソーステーブルの行に適用される UPDATE の一部かどうかを指定します。

    ストリームは2つのオフセットの違いを記録することに注意してください。行が追加され、現在のオフセットで更新された場合、デルタの変更は新しい行になります。 METADATA$ISUPDATE 行には FALSE 値が記録されます。

  • METADATA$ROW_ID:行の一意かつ不変の ID を指定します。特定の行への変更を経時的に追跡するために使用できます。

使用上の注意

  • ストリームを作成するには、データベースとスキーマに対する USAGE 権限とともに、次の権限が明示的に付与されたロールが必要です。

    • スキーマ: CREATE STREAM

    • ソーステーブル: SELECT

  • 同じトランザクションで複数のオブジェクトを更新するためにストリームを複数回クエリすると、同じデータが返されます。

  • ストリームが DML ステートメントで使用されると、ストリームの位置(オフセット)が進みます。位置は、トランザクションの終了時にトランザクションの開始タイムスタンプに更新されます。ストリームは、ストリームの現在の位置から始まり、現在のトランザクションタイムスタンプで終わる変更レコードを記述します。

    複数のステートメントがストリーム内の同じ変更レコードにアクセスするようにするには、それらを明示的なトランザクションステートメント(BEGIN .. COMMIT)で囲みます。明示的なトランザクションがストリームをロックするため、ソーステーブルへの DML 更新は、トランザクションがコミットされるまでストリームに報告されません。

  • ストリームには、Fail-safe期間またはTime Travel保持期間はありません。ストリームがドロップされた場合、これらのオブジェクトのメタデータは回復できません。

  • テーブルの最初のストリームが作成されると、非表示の列のペアがテーブルに追加され、変更追跡メタデータの保存が開始されます。列は少量のストレージを消費します。

テーブルストリームの作成

mytable テーブルにストリームを作成します。

CREATE STREAM mystream ON TABLE mytable;

ソーステーブルでTime Travelを使用する

指定されたタイムスタンプの日付と時刻の前に存在していたように、 mytable テーブルにストリームを作成します。

CREATE STREAM mystream ON TABLE mytable BEFORE (TIMESTAMP => TO_TIMESTAMP(40*365*86400));

指定されたタイムスタンプの正確な日時に存在していたように、 mytable テーブルにストリームを作成します。

CREATE STREAM mystream ON TABLE mytable AT (TIMESTAMP => TO_TIMESTAMP_TZ('02/02/2019 01:02:03', 'mm/dd/yyyy hh24:mi:ss'));

5分前に存在していたように、 mytable テーブルにストリームを作成します。

CREATE STREAM mystream ON TABLE mytable AT(OFFSET => -60*5);

mytable テーブルに、指定されたトランザクションまでに加えられた変更を含まないトランザクションを含むストリームを作成します。

CREATE STREAM mystream ON TABLE mytable BEFORE(STATEMENT => '8e5d0ca9-005e-44e6-b858-a8f5b37c5726');

外部テーブルでの挿入のみのストリームの作成

外部テーブルストリームを作成し、ストリーム内の変更データキャプチャレコードをクエリして、外部テーブルメタデータに追加されたレコードを追跡します。

-- Create an external table that points to the MY_EXT_STAGE stage.
-- The external table is partitioned by the date (in YYYY/MM/DD format) in the file path.
CREATE OR REPLACE EXTERNAL TABLE my_ext_table (
  date_part date as to_date(substr(metadata$filename, 11, 10), 'YYYY/MM/DD'),
  ts timestamp AS (value:time::timestamp),
  user_id varchar AS (value:userId::varchar),
  color varchar AS (value:color::varchar)
) PARTITION BY (date_part)
  LOCATION=@my_ext_stage
  AUTO_REFRESH = false
  FILE_FORMAT=(TYPE=JSON);

-- Create a stream on the external table
CREATE OR REPLACE STREAM my_ext_table_stream ON EXTERNAL TABLE exttable_s3_part INSERT_ONLY = TRUE;

-- Execute SHOW streams
-- The MODE column indicates that the new stream is an INSERT_ONLY stream
SHOW STREAMS;
+-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------+
| created_on                    | name                   | database_name | schema_name | owner        | comment   | table_name                         | type  | stale | mode        |
|-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------|
| 2020-08-02 05:13:20.174 -0800 | MY_EXT_TABLE_STREAM    | MYDB          | PUBLIC      | MYROLE       |           | MYDB.PUBLIC.EXTTABLE_S3_PART       | DELTA | false | INSERT_ONLY |
+-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------+

-- Add a file named '2020/08/05/1408/log-08051409.json' to the stage using the appropriate tool for the cloud storage service.

-- Manually refresh the external table metadata.
ALTER EXTERNAL TABLE my_ext_table REFRESH;

-- Query the external table stream.
-- The stream indicates that the rows in the added JSON file were recorded in the external table metadata.
SELECT * FROM my_ext_table_stream;
+----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------+
| VALUE                                  | DATE_PART  | TS                      | USER_ID | COLOR | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | METADATA$FILENAME                           |
|----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------|
| {                                      | 2020-08-05 | 2020-08-05 15:57:01.000 | user25  | green | INSERT          | False             |                 | test/logs/2020/08/05/1408/log-08051409.json |
|   "color": "green",                    |            |                         |         |       |                 |                   |                 |                                             |
|   "time": "2020-08-05 15:57:01-07:00", |            |                         |         |       |                 |                   |                 |                                             |
|   "userId": "user25"                   |            |                         |         |       |                 |                   |                 |                                             |
| }                                      |            |                         |         |       |                 |                   |                 |                                             |
| {                                      | 2020-08-05 | 2020-08-05 15:58:02.000 | user56  | brown | INSERT          | False             |                 | test/logs/2020/08/05/1408/log-08051409.json |
|   "color": "brown",                    |            |                         |         |       |                 |                   |                 |                                             |
|   "time": "2020-08-05 15:58:02-07:00", |            |                         |         |       |                 |                   |                 |                                             |
|   "userId": "user56"                   |            |                         |         |       |                 |                   |                 |                                             |
| }                                      |            |                         |         |       |                 |                   |                 |                                             |
+----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------+