PythonでSnowflakeストリームを管理する¶
Pythonを使用してSnowflakeストリームを管理することができます。これは、データ操作言語(DML)によるテーブルへの変更(挿入、更新、削除など)と、各変更に関するメタデータを記録するオブジェクトです。詳細については、 ストリームの紹介 をご参照ください。
注釈
ALTER STREAM は現在サポートされていません。
Snowflake Python APIs は、2つのタイプに分かれたストリームを表しています。
Stream: ストリームの名前、ターゲット・ラグ、ウェアハウス、クエリ ステートメントなどのプロパティを公開します。StreamResource: 対応するStreamオブジェクトの取得、ストリームの一時停止と再開、ストリームのドロップに使用できるメソッドを公開します。
前提条件¶
このトピックの例では、Snowflakeと接続するコードを追加して Root オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。
たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
出来上がった Session オブジェクトを使って、コードは API のタイプとメソッドを使う Root オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。
ストリームの作成¶
ストリームを作成するには、まず Stream オブジェクトを作成して、 API Root オブジェクトから StreamCollection オブジェクトを作成します。 StreamCollection.create を使って、新しいストリームをSnowflakeに追加します。
以下のオブジェクト・タイプでストリームを作成できます。
標準テーブル
ビュー
ディレクトリテーブル
ソーステーブル上で¶
以下の例のコードは、 my_db データベースおよび my_schema スキーマのソース・テーブル my_table 上の my_stream_on_table という名前のストリームを表す Stream オブジェクトを、指定されたストリーム・プロパティで作成します。
注釈
StreamSourceTable タイプは標準的なテーブルのみをサポートします。動的テーブル、イベント・テーブル、外部テーブル、Icebergテーブルなど、その他のタイプのテーブルは現在サポートされていません。
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable
stream_on_table = Stream(
"my_stream_on_table",
StreamSourceTable(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_table',
append_only = True,
show_initial_rows = False,
),
comment = 'create stream on table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
コードは、 StreamCollection 変数 streams を作成し、 StreamCollection.create を使用してSnowflakeに新しいストリームを作成します。
ソースビュー上で¶
以下の例のコードは、 my_db データベースおよび my_schema スキーマのソース・ビュー my_view 上の my_stream_on_view という名前のストリームを表す Stream オブジェクトを、指定されたストリーム・プロパティで作成します。
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView
stream_on_view = Stream(
"my_stream_on_view",
StreamSourceView(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_view',
),
comment = 'create stream on view'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
ソース・ディレクトリ・テーブル上で¶
以下の例のコードは、 my_db データベースと my_schema スキーマのソース・ディレクトリ・テーブル my_directory_table 上の my_stream_on_directory_table という名前のストリームを表す Stream オブジェクトを、指定されたストリーム・プロパティで作成します。
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage
stream_on_directory_table = Stream(
"my_stream_on_directory_table",
StreamSourceStage(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_directory_table',
),
comment = 'create stream on directory table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
ストリームのクローニング¶
次の例のコードは、 my_db データベースおよび my_schema スキーマのソース・ストリーム my_other_stream と同じ定義で、 my_stream という新しいストリームを作成します。
from snowflake.core.stream import Stream
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
ストリームの詳細を取得する¶
Stream オブジェクトを返す StreamResource.fetch メソッドを呼び出すことで、ストリームに関する情報を取得できます。
次の例のコードは、 my_db データベースと my_schema スキーマの my_stream というストリームの情報を取得します。
stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
ストリームのリスト¶
Stream オブジェクトの PagedIter 反復子を返す StreamCollection.iter メソッドを使用して、ストリームを一覧表示することができます。
次の例のコードは、 my_db データベースと my_schema スキーマで、名前が my で始まるストリームをリストし、それぞれの名前を表示します。
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
print(stream_obj.name)
次の例のコードも、名前が my で始まるストリームを一覧表示しますが、 like の代わりに starts_with パラメーターを使用します。この例では、結果の数を 10 に制限するために、オプションのパラメーター show_limit=10 も設定しています。
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
print(stream_obj.name)
ストリームのドロップ¶
StreamResource オブジェクトでストリームをドロップできます。
次の例のコードは、 my_stream ストリーム・リソース・オブジェクトを取得し、ストリームを削除します。
my_stream_res = root.streams["my_stream"]
my_stream_res.drop()