PythonでSnowflakeストリームを管理する

Pythonを使用してSnowflakeストリームを管理することができます。これは、データ操作言語(DML)によるテーブルへの変更(挿入、更新、削除など)と、各変更に関するメタデータを記録するオブジェクトです。詳細については、 ストリームの紹介 をご参照ください。

注釈

ALTER STREAM は現在サポートされていません。

Snowflake Python APIs は、2つのタイプに分かれたストリームを表しています。

  • Stream: ストリームの名前、ターゲット・ラグ、ウェアハウス、クエリ ステートメントなどのプロパティを公開します。

  • StreamResource: 対応する Stream オブジェクトの取得、ストリームの一時停止と再開、ストリームのドロップに使用できるメソッドを公開します。

前提条件

このトピックの例では、Snowflakeと接続するコードを追加して Root オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。

たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

出来上がった Session オブジェクトを使って、コードは API のタイプとメソッドを使う Root オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。

ストリームの作成

ストリームを作成するには、まず Stream オブジェクトを作成して、 API Root オブジェクトから StreamCollection オブジェクトを作成します。 StreamCollection.create を使って、新しいストリームをSnowflakeに追加します。

以下のオブジェクト・タイプでストリームを作成できます。

  • 標準テーブル

  • ビュー

  • ディレクトリテーブル

ソーステーブル上で

以下の例のコードは、 my_db データベースおよび my_schema スキーマのソース・テーブル my_table 上の my_stream_on_table という名前のストリームを表す Stream オブジェクトを、指定されたストリーム・プロパティで作成します。

注釈

StreamSourceTable タイプは標準的なテーブルのみをサポートします。動的テーブル、イベント・テーブル、外部テーブル、Icebergテーブルなど、その他のタイプのテーブルは現在サポートされていません。

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable

stream_on_table = Stream(
  "my_stream_on_table",
  StreamSourceTable(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_table',
      append_only = True,
      show_initial_rows = False,
  ),
  comment = 'create stream on table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
Copy

コードは、 StreamCollection 変数 streams を作成し、 StreamCollection.create を使用してSnowflakeに新しいストリームを作成します。

ソースビュー上で

以下の例のコードは、 my_db データベースおよび my_schema スキーマのソース・ビュー my_view 上の my_stream_on_view という名前のストリームを表す Stream オブジェクトを、指定されたストリーム・プロパティで作成します。

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView

stream_on_view = Stream(
  "my_stream_on_view",
  StreamSourceView(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_view',
  ),
  comment = 'create stream on view'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
Copy

ソース・ディレクトリ・テーブル上で

以下の例のコードは、 my_db データベースと my_schema スキーマのソース・ディレクトリ・テーブル my_directory_table 上の my_stream_on_directory_table という名前のストリームを表す Stream オブジェクトを、指定されたストリーム・プロパティで作成します。

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage

stream_on_directory_table = Stream(
  "my_stream_on_directory_table",
  StreamSourceStage(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_directory_table',
  ),
  comment = 'create stream on directory table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
Copy

ストリームのクローニング

次の例のコードは、 my_db データベースおよび my_schema スキーマのソース・ストリーム my_other_stream と同じ定義で、 my_stream という新しいストリームを作成します。

from snowflake.core.stream import Stream

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
Copy

ストリームの詳細を取得する

Stream オブジェクトを返す StreamResource.fetch メソッドを呼び出すことで、ストリームに関する情報を取得できます。

次の例のコードは、 my_db データベースと my_schema スキーマの my_stream というストリームの情報を取得します。

stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
Copy

ストリームのリスト

Stream オブジェクトの PagedIter 反復子を返す StreamCollection.iter メソッドを使用して、ストリームを一覧表示することができます。

次の例のコードは、 my_db データベースと my_schema スキーマで、名前が my で始まるストリームをリストし、それぞれの名前を表示します。

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

次の例のコードも、名前が my で始まるストリームを一覧表示しますが、 like の代わりに starts_with パラメーターを使用します。この例では、結果の数を 10 に制限するために、オプションのパラメーター show_limit=10 も設定しています。

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

ストリームのドロップ

StreamResource オブジェクトでストリームをドロップできます。

次の例のコードは、 my_stream ストリーム・リソース・オブジェクトを取得し、ストリームを削除します。

my_stream_res = root.streams["my_stream"]
my_stream_res.drop()
Copy