CREATE STREAM¶
現在/指定されたスキーマで新しいストリームを作成するか、既存の ストリーム を置き換えます。ストリームは、テーブル、ディレクトリテーブル、外部テーブル、またはビュー内の基になるテーブル(セキュアビューを含む)に対して行われたデータ操作言語(DML)の変更を記録します。変更が記録されるオブジェクトは、 ソースオブジェクト と呼ばれます。
さらに、このコマンドは次のバリアントをサポートしています。
CREATE STREAM ... CLONE (既存のストリームのクローンを作成)
- こちらもご参照ください。
構文¶
コマンド構文は、ストリームが作成されるオブジェクトによって異なります。
-- table
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
<name>
[ COPY GRANTS ]
ON TABLE <table_name>
[ { AT | BEFORE } ( { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> | STREAM => '<name>' } ) ]
[ APPEND_ONLY = TRUE | FALSE ]
[ SHOW_INITIAL_ROWS = TRUE | FALSE ]
[ COMMENT = '<string_literal>' ]
-- External table
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
<name>
[ COPY GRANTS ]
ON EXTERNAL TABLE <external_table_name>
[ { AT | BEFORE } ( { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> | STREAM => '<name>' } ) ]
[ INSERT_ONLY = TRUE ]
[ COMMENT = '<string_literal>' ]
-- Directory table
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
<name>
[ COPY GRANTS ]
ON STAGE <stage_name>
[ COMMENT = '<string_literal>' ]
-- View
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
<name>
[ COPY GRANTS ]
ON VIEW <view_name>
[ { AT | BEFORE } ( { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> | STREAM => '<name>' } ) ]
[ APPEND_ONLY = TRUE | FALSE ]
[ SHOW_INITIAL_ROWS = TRUE | FALSE ]
[ COMMENT = '<string_literal>' ]
バリアント構文¶
CREATE STREAM ... CLONE
ソースストリームと同じ定義で新しいストリームを作成します。クローンは、ソースストリームから現在の オフセット (つまり、現在のトランザクション テーブルバージョン)を継承します。
CREATE [ OR REPLACE ] STREAM <name> CLONE <source_stream> [ COPY GRANTS ] [ ... ]
クローン作成の詳細については、 CREATE <オブジェクト> ... CLONE をご参照ください。
必須パラメーター¶
name
ストリームの識別子(つまり、名前)を指定する文字列。ストリームが作成されるスキーマに対して一意である必要があります。
また、識別子はアルファベット文字で始まる必要があり、識別子文字列全体が二重引用符で囲まれていない限り、スペースや特殊文字を含めることはできません(例:
"My object"
)。二重引用符で囲まれた識別子も大文字と小文字が区別されます。詳細については、 識別子の要件 をご参照ください。
table_name
ストリームによって変更が追跡されるテーブル(つまり、ソーステーブル)の識別子(つまり、名前)を指定する文字列。
- アクセス制御:
ストリームにクエリを実行するには、ロールに基になるテーブルに対する SELECT 権限が必要です。
external_table_name
ストリームによって変更が追跡される外部テーブル(つまり、ソース外部テーブル)の識別子(つまり、名前)を指定する文字列。
- アクセス制御:
ストリームにクエリを実行するには、ロールに基になる外部テーブルに対する SELECT 権限が必要です。
stage_name
ストリームによって変更が追跡されるディレクトリテーブル(つまり、ソースディレクトリテーブル)のステージの識別子(つまり、名前)を指定する文字列。
- アクセス制御:
ストリームをクエリするには、基になるステージに対する USAGE (外部ステージ)または READ (内部ステージ)権限がロールに必要です。
view_name
ソースビューの識別子(つまり、名前)を指定する文字列。ストリームは、ビューの基になるテーブルへの DML の変更を追跡します。
ビューのストリームの詳細については、 Streams on Views をご参照ください。
- アクセス制御:
ストリームをクエリするには、ビューに対する SELECT 権限が必要です。
オプションのパラメーター¶
COPY GRANTS
次の CREATE STREAM バリアントのいずれかを使用して新しいストリームが作成されるときに、元のストリームからのアクセス許可を保持することを指定します。
CREATE OR REPLACE STREAM
CREATE STREAM ... CLONE
このパラメーターは、 OWNERSHIP を 除く すべての許可を既存のストリームから新しいストリームにコピーします。デフォルトでは、 CREATE STREAM コマンドを実行するロールが新しいストリームを所有します。
注釈
CREATE STREAM ステートメントが複数のストリーム(
create or replace stream t1 clone t2;
など)を参照する場合、COPY GRANTS
句は置換されるストリームを優先します。置換ストリームの SHOW GRANTS 出力には、コピーされた権限の被付与者が、ステートメントが実行されたときの現在のタイムスタンプとともに、 CREATE STREAM ステートメントを実行したロールとしてリストされます。
許可をコピーする操作は、 CREATE STREAM コマンドで(つまり、同じトランザクション内で)アトミックに発生します。
注釈
このパラメーターは現在サポートされていません。
AT ( { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> | STREAM => '<name>' ) | BEFORE ( TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> ) }
過去の特定の時間/ポイントでストリームを作成します( Time Travel を使用)。 AT | BEFORE 句は、履歴データがリクエストされる過去のポイントを決定します。
AT
キーワードは、指定されたパラメーターに等しいタイムスタンプを持つステートメントまたはトランザクションによる変更がリクエストに含まれることを指定します。STREAM => '<名前>'
の値は特殊です。CREATE STREAM ステートメントを指定すると、指定したストリームと同じオフセットに新しいストリームが作成されます。既存のストリームを再作成するときに(OR REPLACE
キーワードを使用して)この値を指定して、再作成後にストリームの現在のオフセットを保持することもできます。'<名前>'
は、オフセットが新しいストリームまたは再作成されたストリームにコピーされる既存のストリームの識別子(つまり、名前)です。ストリームが DML トランザクションで使用される場合、新しいストリームまたは再作成されたストリームは、通常どおりオフセットを進めます。
BEFORE
キーワードは、リクエストが指定されたパラメーターの直前のポイントを参照するように指定します。
注釈
AT | BEFORE 句で指定された過去のポイントで、ソースオブジェクトに使用可能な変更追跡データがない場合、 CREATE STREAM ステートメントは失敗します。変更追跡が記録される前の過去に、ストリームを作成することはできません。
APPEND_ONLY = TRUE | FALSE
標準テーブルをクエリする、標準テーブルのストリームまたはビューのストリームのみがサポートされます。
これが追加専用ストリームかどうかを指定します。追加専用ストリームは、行の挿入のみを追跡します。更新および削除操作(テーブルの切り捨てを含む)は記録されません。例えば、10行がテーブルに挿入され、追加専用ストリームのオフセットが進む前にそれらの行の5が削除された場合、ストリームは10行を記録します。
このタイプのストリームは、標準ストリームよりもクエリのパフォーマンスを向上させ、抽出、ロード、変換(ELT)および行挿入のみに依存する同様のシナリオに非常に役立ちます。
標準ストリームは、変更セット内の削除された行と挿入された行を結合して、削除された行と更新された行を判別します。追加専用ストリームは追加された行のみを返すため、標準ストリームよりもはるかにパフォーマンスが向上します。例えば、追加専用ストリームの行が消費された直後にソーステーブルを切り捨てることができ、レコードの削除は、次にストリームがクエリまたは消費されるときにオーバーヘッドに寄与しません。
- デフォルト:
FALSE
INSERT_ONLY = TRUE | FALSE
外部テーブルのストリームに必要です。他のオブジェクトのストリームではサポートされていません。
これが挿入専用ストリームかどうかを指定します。挿入専用ストリームは、行の挿入のみを追跡します。挿入されたセットから行を削除する削除操作は記録されません(つまり、操作なし)。例えば、任意の2つのオフセットの間で、外部テーブルによって参照されるクラウドストレージの場所から
File1
が削除され、File2
が追加された場合、ストリームはFile2
の行のレコードのみを返します。標準テーブルの変更データキャプチャ(CDC)データを追跡する場合とは異なり、Snowflakeはクラウドストレージ内のファイルの履歴記録にアクセスできません。- デフォルト:
FALSE
SHOW_INITIAL_ROWS = TRUE | FALSE
ストリームが最初に消費されたときに返す記録を指定します。
TRUE
ストリームは、ストリームが作成された時点でソースオブジェクトに存在していた行 のみ を返します。METADATA$ISUPDATE 列は、これらの行に FALSE 値を示しています。その後、ストリームは、最新のオフセット以降におけるソースオブジェクトへの DML 変更を返します。つまり、通常のストリームの動作です。
このパラメーターを使用すると、ストリームのソースオブジェクトの内容を使用してダウンストリームプロセスを初期化できます。
FALSE
ストリームは、最新のオフセット以降におけるソースオブジェクトへの DML 変更を返します。
- デフォルト:
FALSE
COMMENT = 'string_literal'
ストリームのコメントを指定する文字列(リテラル)。
デフォルト: 値なし
出力¶
ストリームの出力には、ソースオブジェクトと同じ列と、次の追加の列が含まれます。
METADATA$ACTION:アクションを指定します(INSERT または DELETE)。
METADATA$ISUPDATE: 記録されたアクション(INSERT または DELETE)が、ソーステーブルまたはビューの行に適用される UPDATE の一部かどうかを指定します。
ストリームは2つのオフセットの違いを記録することに注意してください。行が追加され、現在のオフセットで更新された場合、デルタの変更は新しい行になります。 METADATA$ISUPDATE 行には FALSE 値が記録されます。
METADATA$ROW_ID:行の一意かつ不変の ID を指定します。特定の行への変更を経時的に追跡するために使用できます。
アクセス制御の要件¶
この SQL コマンドの実行に使用される ロール には、少なくとも次の 権限 が必要です。
標準テーブルのストリーム:
オブジェクト
権限
メモ
スキーマ
CREATE STREAM
テーブル
SELECT
ソーステーブルで変更の追跡が有効になっていない場合(ALTER TABLE ... SET CHANGE_TRACKING = TRUE を使用)は、テーブルの所有者(つまり、テーブルに対する OWNERSHIP 権限を持つロール)のみが、テーブルの最初のストリームを作成できます。最初のストリームを作成すると、テーブルでの変更の追跡が自動で有効になります。
スキーマ内の任意のオブジェクトを操作するには、親データベースとスキーマに対する USAGE 権限も必要であることに注意してください。
ビューのストリーム:
オブジェクト
権限
メモ
スキーマ
CREATE STREAM
ビュー
SELECT
ソースビューとその基になるテーブルで変更の追跡が有効になっていない場合は、ビューとその基になるテーブルの所有者に対して OWNERSHIP 権限を持つロールのみが、ビューに初期ストリームを作成できます。最初のストリームを作成すると、テーブルでの変更の追跡が自動で有効になります。ビューとその基になるテーブルで変更の追跡を有効にする手順については、 ビューと基になるテーブルの変更追跡の有効化 をご参照ください。変更追跡を有効にすると、変更追跡が有効になっている間、基になるテーブルがロックされることに注意してください。基にあるオブジェクトのロックにより、これらのオブジェクトの DDL/DML 操作で遅延が発生する可能性があります。詳細については、 リソースのロック をご参照ください。
スキーマ内の任意のオブジェクトを操作するには、親データベースとスキーマに対する USAGE 権限も必要であることに注意してください。
ディレクトリテーブルのストリーム:
オブジェクト
権限
メモ
スキーマ
CREATE STREAM
ステージ
USAGE (外部ステージ)または READ (内部ステージ)
スキーマ内の任意のオブジェクトを操作するには、親データベースとスキーマに対する USAGE 権限も必要であることに注意してください。
外部テーブルのテーブルストリーム:
オブジェクト
権限
メモ
スキーマ
CREATE STREAM
外部テーブル
SELECT
スキーマ内の任意のオブジェクトを操作するには、親データベースとスキーマに対する USAGE 権限も必要であることに注意してください。
指定された権限のセットを使用してカスタムロールを作成する手順については、 カスタムロールの作成 をご参照ください。
セキュリティ保護可能なオブジェクト に対して SQL アクションを実行するためのロールと権限付与に関する一般的な情報については、 アクセス制御の概要 をご参照ください。
使用上の注意¶
同じトランザクションで複数のオブジェクトを更新するためにストリームを複数回クエリすると、同じデータが返されます。
ストリームが DML ステートメントで使用されると、ストリームの位置(オフセット)が進みます。位置は、トランザクションの終了時にトランザクションの開始タイムスタンプに更新されます。ストリームは、ストリームの現在の位置から始まり、現在のトランザクションタイムスタンプで終わる変更レコードを記述します。
複数のステートメントがストリーム内の同じ変更レコードにアクセスするようにするには、それらを明示的なトランザクションステートメント(BEGIN .. COMMIT)で囲みます。明示的なトランザクションがストリームをロックするため、ソースオブジェクトへの DML 更新は、トランザクションがコミットされるまでストリームに報告されません。
ストリームには、Fail-safe期間またはTime Travel保持期間はありません。ストリームがドロップされた場合、これらのオブジェクトのメタデータは回復できません。
共有テーブルのストリーム:
ソーステーブルの保持期間では、テーブル上のストリームが古くなることを防ぐ自動延長は ありません。
標準ストリームは、地理空間データの変更データを取得できません。地理空間データを含むオブジェクトに追加専用ストリームを作成することをお勧めします。
ビューのストリーム:
ビュー所有者のロール(つまり、ビューに対する OWNERSHIP 権限を持つロール)を使用してビューに最初のストリームを作成すると、ビューでの変更追跡が可能になります。同じロールが基になるテーブルも所有している場合、変更追跡もテーブルで有効になります。ビューとその基になるテーブルの両方でロールに OWNERSHIP 権限が付与されていない場合は、該当するオブジェクトで変更追跡を手動で有効にする必要があります。手順については、 ビューと基になるテーブルの変更追跡の有効化 をご参照ください。
ビュー内の結合の数によっては、基になるテーブルを1回変更すると、ストリーム出力に多数の変更が生じる可能性があります。
ソースビューまたは基になるテーブルがドロップまたは再作成されると(CREATE OR REPLACE VIEW を使用)、特定のビューのストリームが中断します。
セキュアビュー上のすべてのストリームは、セキュアビューの制約に従います。
非セキュアビューの所有者(つまり、ビューに対する OWNERSHIP 権限を持つロール)がビューをセキュアビューに変更した場合(ALTER VIEW ... SET SECURE を使用)、ビュー上のストリームは自動的にセキュアビューの制約を強制します。
さらに、基になるテーブルの保持期間では、セキュアビューのストリームが古くなることを防ぐ自動延長は ありません。
ビューが非決定的関数を使用するビューに基づくストリームは、非決定論的結果を返す可能性があります。
たとえば、 CURRENT_DATE や CURRENT_USER などの コンテキスト関数 の結果は非決定的です。 RANDOM などの データ生成関数 の結果も非決定的です。ビューに非決定的関数が含まれている場合、そのビューのストリームは、関数の出力の定数スナップショットにはなりません。代わりに、ストリーム内の値はクエリ時に変更される場合があります。
ビューの結果の非決定性が、ストリームクエリの結果の正確さに影響を与えないようにすることをお勧めします。
例については、 非決定的 SQL 関数を呼び出すビューでのストリーム をご参照ください。
ディレクトリテーブルのストリーム: ストリーム出力の METADATA$ROW_ID 列の値は空です。
メタデータについて。
注意
Snowflakeサービスを使用する場合、お客様は、個人データ(ユーザーオブジェクト向け以外)、機密データ、輸出管理データ、またはその他の規制されたデータがメタデータとして入力されていないことを確認する必要があります。詳細については、 Snowflakeのメタデータフィールド をご参照ください。
CREATE OR REPLACE <オブジェクト> ステートメントはアトミックです。つまり、オブジェクトが置き換えられると、単一のトランザクションで、古いオブジェクトが削除されて新しいオブジェクトが作成されます。
例¶
テーブルストリームの作成¶
mytable
テーブルにストリームを作成します。
CREATE STREAM mystream ON TABLE mytable;
ソーステーブルでTime Travelを使用する¶
指定されたタイムスタンプの日付と時刻の前に存在していたように、 mytable
テーブルにストリームを作成します。
CREATE STREAM mystream ON TABLE mytable BEFORE (TIMESTAMP => TO_TIMESTAMP(40*365*86400));
指定されたタイムスタンプの正確な日時に存在していたように、 mytable
テーブルにストリームを作成します。
CREATE STREAM mystream ON TABLE mytable AT (TIMESTAMP => TO_TIMESTAMP_TZ('02/02/2019 01:02:03', 'mm/dd/yyyy hh24:mi:ss'));
5分前に存在していたように、 mytable
テーブルにストリームを作成します。
CREATE STREAM mystream ON TABLE mytable AT(OFFSET => -60*5);
同じソーステーブル上の既存のストリーム oldstream
と同じオフセットで、 mytable
テーブルにストリームを作成します。
CREATE STREAM mystream ON TABLE mytable AT(STREAM => 'oldstream');
既存の mystream
ストリームを再作成しますが、現在のオフセットは保持します。
CREATE OR REPLACE STREAM mystream ON TABLE mytable AT(STREAM => 'mystream');
mytable
テーブルに、指定されたトランザクションまでに加えられた変更を含まないトランザクションを含むストリームを作成します。
CREATE STREAM mystream ON TABLE mytable BEFORE(STATEMENT => '8e5d0ca9-005e-44e6-b858-a8f5b37c5726');
単一テーブルビューでのストリームの作成¶
myview
ビューでストリームを作成します。
CREATE STREAM mystream ON VIEW myview;
その他の例については、 ストリームの例 をご参照ください。
外部テーブルでの挿入のみのストリームの作成¶
外部テーブルストリームを作成し、ストリーム内の変更データキャプチャ記録をクエリして、外部テーブルメタデータに追加された記録を追跡します。
-- Create an external table that points to the MY_EXT_STAGE stage.
-- The external table is partitioned by the date (in YYYY/MM/DD format) in the file path.
CREATE EXTERNAL TABLE my_ext_table (
date_part date as to_date(substr(metadata$filename, 1, 10), 'YYYY/MM/DD'),
ts timestamp AS (value:time::timestamp),
user_id varchar AS (value:userId::varchar),
color varchar AS (value:color::varchar)
) PARTITION BY (date_part)
LOCATION=@my_ext_stage
AUTO_REFRESH = false
FILE_FORMAT=(TYPE=JSON);
-- Create a stream on the external table
CREATE STREAM my_ext_table_stream ON EXTERNAL TABLE my_ext_table INSERT_ONLY = TRUE;
-- Execute SHOW streams
-- The MODE column indicates that the new stream is an INSERT_ONLY stream
SHOW STREAMS;
+-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------+
| created_on | name | database_name | schema_name | owner | comment | table_name | type | stale | mode |
|-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------|
| 2020-08-02 05:13:20.174 -0800 | MY_EXT_TABLE_STREAM | MYDB | PUBLIC | MYROLE | | MYDB.PUBLIC.EXTTABLE_S3_PART | DELTA | false | INSERT_ONLY |
+-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------+
-- Add a file named '2020/08/05/1408/log-08051409.json' to the stage using the appropriate tool for the cloud storage service.
-- Manually refresh the external table metadata.
ALTER EXTERNAL TABLE my_ext_table REFRESH;
-- Query the external table stream.
-- The stream indicates that the rows in the added JSON file were recorded in the external table metadata.
SELECT * FROM my_ext_table_stream;
+----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------+
| VALUE | DATE_PART | TS | USER_ID | COLOR | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | METADATA$FILENAME |
|----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------|
| { | 2020-08-05 | 2020-08-05 15:57:01.000 | user25 | green | INSERT | False | | test/logs/2020/08/05/1408/log-08051409.json |
| "color": "green", | | | | | | | | |
| "time": "2020-08-05 15:57:01-07:00", | | | | | | | | |
| "userId": "user25" | | | | | | | | |
| } | | | | | | | | |
| { | 2020-08-05 | 2020-08-05 15:58:02.000 | user56 | brown | INSERT | False | | test/logs/2020/08/05/1408/log-08051409.json |
| "color": "brown", | | | | | | | | |
| "time": "2020-08-05 15:58:02-07:00", | | | | | | | | |
| "userId": "user56" | | | | | | | | |
| } | | | | | | | | |
+----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------+
ディレクトリテーブルでの標準ストリームの作成¶
mystage
という名前のステージのディレクトリテーブルにストリームを作成します。
CREATE STREAM dirtable_mystage_s ON STAGE mystage;
ディレクトリテーブルのメタデータを手動で更新して、ストリームにデータを入力します。
ALTER STAGE mystage REFRESH;
ストリームに対する最新のオフセットの後に1つ以上のファイルがステージに追加された後、ストリームをクエリします。
SELECT * FROM dirtable_mystage_s;
+-------------------+--------+-------------------------------+----------------------------------+----------------------------------+-------------------------------------------------------------------------------------------+-----------------+-------------------+-----------------+
| RELATIVE_PATH | SIZE | LAST_MODIFIED | MD5 | ETAG | FILE_URL | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|-------------------+--------+-------------------------------+----------------------------------+----------------------------------+-------------------------------------------------------------------------------------------+-----------------+-------------------+-----------------|
| file1.csv.gz | 1048 | 2021-05-14 06:09:08.000 -0700 | c98f600c492c39bef249e2fcc7a4b6fe | c98f600c492c39bef249e2fcc7a4b6fe | https://myaccount.snowflakecomputing.com/api/files/MYDB/MYSCHEMA/MYSTAGE/file1%2ecsv%2egz | INSERT | False | |
| file2.csv.gz | 3495 | 2021-05-14 06:09:09.000 -0700 | 7f1a4f98ef4c7c42a2974504d11b0e20 | 7f1a4f98ef4c7c42a2974504d11b0e20 | https://myaccount.snowflakecomputing.com/api/files/MYDB/MYSCHEMA/MYSTAGE/file2%2ecsv%2egz | INSERT | False | |
+-------------------+--------+-------------------------------+----------------------------------+----------------------------------+-------------------------------------------------------------------------------------------+-----------------+-------------------+-----------------+