PythonでSnowflakeの動的テーブルを管理する

Pythonを使用して、Snowflake動的テーブルを管理することができます。これは、継続的処理パイプライン用の新しいテーブルタイプです。動的テーブル: 動的テーブルは指定されたクエリの結果をマテリアライズ化します。この機能の概要については、 動的テーブル をご参照ください。

Snowflake Python APIs は、動的テーブルを2つの別々のタイプで表します。

  • DynamicTable: 動的テーブルの名前、ターゲット・ラグ、ウェアハウス、クエリ・ステートメントなどのプロパティを公開します。

  • DynamicTableResource: 対応する DynamicTable オブジェクトの取得、動的テーブルの一時停止と再開、動的テーブルの削除に使用できるメソッドを公開します。

前提条件

このトピックの例では、Snowflakeと接続するコードを追加して Root オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。

たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

出来上がった Session オブジェクトを使って、コードは API のタイプとメソッドを使う Root オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。

動的テーブルの作成

動的テーブルを作成するには、まず DynamicTable オブジェクトを作成して、 API Root オブジェクトから DynamicTableCollection オブジェクトを作成します。 DynamicTableCollection.create を使用して、新しい動的テーブルをSnowflakeに追加します。

次の例のコードは、 my_db データベースと my_schema スキーマの my_dynamic_table という名前の動的テーブルを表す DynamicTable オブジェクトを、必要最小限のオプションを指定して作成します。

from snowflake.core.dynamic_table import DynamicTable, DownstreamLag

my_dt = DynamicTable(
  name='my_dynamic_table',
  target_lag=DownstreamLag(),
  warehouse='my_wh',
  query='SELECT * FROM t',
)
dynamic_tables = root.databases['my_db'].schemas['my_schema'].dynamic_tables
dynamic_tables.create(my_dt)
Copy

コードは、 DynamicTableCollection 変数 dynamic_tables を作成し、 DynamicTableCollection.create を使用してSnowflakeに新しい動的テーブルを作成します。

次の例のコードは、 my_db データベースの my_dynamic_table2 という名前の動的テーブルと、現在可能なすべてのオプションが指定された my_schema スキーマを表す DynamicTable オブジェクトを作成します。

from snowflake.core.dynamic_table import DynamicTable, UserDefinedLag

root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
  DynamicTable(
      name='my_dynamic_table2',
      kind='PERMANENT',
      target_lag=UserDefinedLag(seconds=60),
      warehouse='my_wh',
      query='SELECT * FROM t',
      refresh_mode='FULL',
      initialize='ON_SCHEDULE',
      cluster_by=['id > 1'],
      comment='test table',
      data_retention_time_in_days=7,
      max_data_extension_time_in_days=7,
  )
)
Copy

動的テーブルのクローニング

次の例のコードは、 my_db データベースと my_schema スキーマのソース・動的テーブル my_dynamic_table から、同じ列定義とすべての既存データを持つ my_dynamic_table2 という名前の新しい動的テーブルを作成します。

注釈

このクローンオペレーションは、 DynamicTableClone オブジェクトを使用します。このオブジェクトには、オプションの target_lagwarehouse パラメーターが含まれており、現在のところ他のパラメーターはサポートしていません。

from snowflake.core.dynamic_table import DynamicTableClone

root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
  DynamicTableClone(
      name='my_dynamic_table2',
      warehouse='my_wh2',
  ),
  clone_table='my_dynamic_table',
)
Copy

この機能の詳細については、 CREATE DYNAMIC TABLE ... CLONE をご参照ください。

動的テーブルの詳細の取得

DynamicTable オブジェクトを返す DynamicTableResource.fetch メソッドを呼び出すことで、動的テーブルに関する情報を取得できます。

次の例のコードは、 my_db データベースと my_schema スキーマの my_dynamic_table という名前の動的テーブルの情報を取得します。

dynamic_table = root.databases['my_db'].schemas['my_schema'].dynamic_tables['my_dynamic_table']
dt_details = dynamic_table.fetch()
print(dt_details.to_dict())
Copy

動的テーブルの一覧表示

DynamicTable オブジェクトの PagedIter 反復子を返す DynamicTableCollection.iter メソッドを使用して、動的テーブルを一覧表示することができます。

次の例のコードは、 my_db データベースと my_schema スキーマの my というテキストで始まる名前を持つダイナミック・テーブルを一覧表示し、それぞれの名前を表示します。

from snowflake.core.dynamic_table import DynamicTableCollection

dt_list = root.databases['my_db'].schemas['my_schema'].dynamic_tables.iter(like='my%')
for dt_obj in dt_list:
  print(dt_obj.name)
Copy

動的なテーブル操作の実行

動的テーブルのリフレッシュ、一時停止、再開など、動的テーブルの一般的な操作は DynamicTableResource オブジェクトで実行できます。

動的テーブル・リソースでできる操作をいくつか示すために、次の例のコードでは次のような操作を行っています。

  1. my_dynamic_table 動的テーブル・リソース・オブジェクトを取得します。

  2. 動的テーブルをリフレッシュします。

  3. 動的テーブルを中断します。

  4. 動的テーブルを再開します。

  5. 動的テーブルをドロップします。

my_dynamic_table_res = root.databases["my_db"].schemas["my_schema"].dynamic_tables["my_dynamic_table"]

my_dynamic_table_res.refresh()
my_dynamic_table_res.suspend()
my_dynamic_table_res.resume()
my_dynamic_table_res.drop()
Copy