PythonでSnowflakeストリームを管理する¶
Pythonを使用してSnowflakeストリームを管理することができます。これは、データ操作言語(DML)によるテーブルへの変更(挿入、更新、削除など)と、各変更に関するメタデータを記録するオブジェクトです。詳細については、 ストリームの紹介 をご参照ください。
注釈
ALTER STREAM は現在サポートされていません。
Snowflake Python APIs は、2つのタイプに分かれたストリームを表しています。
Stream
: ストリームの名前、ターゲット・ラグ、ウェアハウス、クエリ ステートメントなどのプロパティを公開します。StreamResource
: 対応するStream
オブジェクトの取得、ストリームの一時停止と再開、ストリームのドロップに使用できるメソッドを公開します。
前提条件¶
このトピックの例では、Snowflakeと接続するコードを追加して Root
オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。
たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
出来上がった Session
オブジェクトを使って、コードは API のタイプとメソッドを使う Root
オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。
ストリームの作成¶
ストリームを作成するには、まず Stream
オブジェクトを作成して、 API Root
オブジェクトから StreamCollection
オブジェクトを作成します。 StreamCollection.create
を使って、新しいストリームをSnowflakeに追加します。
以下のオブジェクト・タイプでストリームを作成できます。
標準テーブル
ビュー
ディレクトリテーブル
ソーステーブル上で¶
以下の例のコードは、 my_db
データベースおよび my_schema
スキーマのソース・テーブル my_table
上の my_stream_on_table
という名前のストリームを表す Stream
オブジェクトを、指定されたストリーム・プロパティで作成します。
注釈
StreamSourceTable
タイプは標準的なテーブルのみをサポートします。動的テーブル、イベント・テーブル、外部テーブル、Icebergテーブルなど、その他のタイプのテーブルは現在サポートされていません。
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable
stream_on_table = Stream(
"my_stream_on_table",
StreamSourceTable(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_table',
append_only = True,
show_initial_rows = False,
),
comment = 'create stream on table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
コードは、 StreamCollection
変数 streams
を作成し、 StreamCollection.create
を使用してSnowflakeに新しいストリームを作成します。
ソースビュー上で¶
以下の例のコードは、 my_db
データベースおよび my_schema
スキーマのソース・ビュー my_view
上の my_stream_on_view
という名前のストリームを表す Stream
オブジェクトを、指定されたストリーム・プロパティで作成します。
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView
stream_on_view = Stream(
"my_stream_on_view",
StreamSourceView(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_view',
),
comment = 'create stream on view'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
ソース・ディレクトリ・テーブル上で¶
以下の例のコードは、 my_db
データベースと my_schema
スキーマのソース・ディレクトリ・テーブル my_directory_table
上の my_stream_on_directory_table
という名前のストリームを表す Stream
オブジェクトを、指定されたストリーム・プロパティで作成します。
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage
stream_on_directory_table = Stream(
"my_stream_on_directory_table",
StreamSourceStage(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_directory_table',
),
comment = 'create stream on directory table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
ストリームのクローニング¶
次の例のコードは、 my_db
データベースおよび my_schema
スキーマのソース・ストリーム my_other_stream
と同じ定義で、 my_stream
という新しいストリームを作成します。
from snowflake.core.stream import Stream
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
ストリームの詳細を取得する¶
Stream
オブジェクトを返す StreamResource.fetch
メソッドを呼び出すことで、ストリームに関する情報を取得できます。
次の例のコードは、 my_db
データベースと my_schema
スキーマの my_stream
というストリームの情報を取得します。
stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
ストリームのリスト¶
Stream
オブジェクトの PagedIter
反復子を返す StreamCollection.iter
メソッドを使用して、ストリームを一覧表示することができます。
次の例のコードは、 my_db
データベースと my_schema
スキーマで、名前が my
で始まるストリームをリストし、それぞれの名前を表示します。
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
print(stream_obj.name)
次の例のコードも、名前が my
で始まるストリームを一覧表示しますが、 like
の代わりに starts_with
パラメーターを使用します。この例では、結果の数を 10
に制限するために、オプションのパラメーター show_limit=10
も設定しています。
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
print(stream_obj.name)
ストリームのドロップ¶
StreamResource
オブジェクトでストリームをドロップできます。
次の例のコードは、 my_stream
ストリーム・リソース・オブジェクトを取得し、ストリームを削除します。
my_stream_res = root.streams["my_stream"]
my_stream_res.drop()