PythonでSnowflakeの動的テーブルを管理する¶
Pythonを使用して、Snowflake動的テーブルを管理することができます。これは、継続的処理パイプライン用の新しいテーブルタイプです。動的テーブル: 動的テーブルは指定されたクエリの結果をマテリアライズ化します。この機能の概要については、 動的テーブル をご参照ください。
Snowflake Python APIs は、動的テーブルを2つの別々のタイプで表します。
DynamicTable
: 動的テーブルの名前、ターゲット・ラグ、ウェアハウス、クエリ・ステートメントなどのプロパティを公開します。DynamicTableResource
: 対応するDynamicTable
オブジェクトの取得、動的テーブルの一時停止と再開、動的テーブルの削除に使用できるメソッドを公開します。
前提条件¶
このトピックの例では、Snowflakeと接続するコードを追加して Root
オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。
たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
出来上がった Session
オブジェクトを使って、コードは API のタイプとメソッドを使う Root
オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。
動的テーブルの作成¶
動的テーブルを作成するには、まず DynamicTable
オブジェクトを作成して、 API Root
オブジェクトから DynamicTableCollection
オブジェクトを作成します。 DynamicTableCollection.create
を使用して、新しい動的テーブルをSnowflakeに追加します。
次の例のコードは、 my_db
データベースと my_schema
スキーマの my_dynamic_table
という名前の動的テーブルを表す DynamicTable
オブジェクトを、必要最小限のオプションを指定して作成します。
from snowflake.core.dynamic_table import DynamicTable, DownstreamLag
my_dt = DynamicTable(
name='my_dynamic_table',
target_lag=DownstreamLag(),
warehouse='my_wh',
query='SELECT * FROM t',
)
dynamic_tables = root.databases['my_db'].schemas['my_schema'].dynamic_tables
dynamic_tables.create(my_dt)
コードは、 DynamicTableCollection
変数 dynamic_tables
を作成し、 DynamicTableCollection.create
を使用してSnowflakeに新しい動的テーブルを作成します。
次の例のコードは、 my_db
データベースの my_dynamic_table2
という名前の動的テーブルと、現在可能なすべてのオプションが指定された my_schema
スキーマを表す DynamicTable
オブジェクトを作成します。
from snowflake.core.dynamic_table import DynamicTable, UserDefinedLag
root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
DynamicTable(
name='my_dynamic_table2',
kind='PERMANENT',
target_lag=UserDefinedLag(seconds=60),
warehouse='my_wh',
query='SELECT * FROM t',
refresh_mode='FULL',
initialize='ON_SCHEDULE',
cluster_by=['id > 1'],
comment='test table',
data_retention_time_in_days=7,
max_data_extension_time_in_days=7,
)
)
動的テーブルのクローニング¶
次の例のコードは、 my_db
データベースと my_schema
スキーマのソース・動的テーブル my_dynamic_table
から、同じ列定義とすべての既存データを持つ my_dynamic_table2
という名前の新しい動的テーブルを作成します。
注釈
このクローンオペレーションは、
DynamicTableClone
オブジェクトを使用します。このオブジェクトには、オプションのtarget_lag
とwarehouse
パラメーターが含まれており、現在のところ他のパラメーターはサポートしていません。
from snowflake.core.dynamic_table import DynamicTableClone
root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
DynamicTableClone(
name='my_dynamic_table2',
warehouse='my_wh2',
),
clone_table='my_dynamic_table',
)
この機能の詳細については、 CREATE DYNAMIC TABLE ... CLONE をご参照ください。
動的テーブルの詳細の取得¶
DynamicTable
オブジェクトを返す DynamicTableResource.fetch
メソッドを呼び出すことで、動的テーブルに関する情報を取得できます。
次の例のコードは、 my_db
データベースと my_schema
スキーマの my_dynamic_table
という名前の動的テーブルの情報を取得します。
dynamic_table = root.databases['my_db'].schemas['my_schema'].dynamic_tables['my_dynamic_table']
dt_details = dynamic_table.fetch()
print(dt_details.to_dict())
動的テーブルの一覧表示¶
DynamicTable
オブジェクトの PagedIter
反復子を返す DynamicTableCollection.iter
メソッドを使用して、動的テーブルを一覧表示することができます。
次の例のコードは、 my_db
データベースと my_schema
スキーマの my
というテキストで始まる名前を持つダイナミック・テーブルを一覧表示し、それぞれの名前を表示します。
from snowflake.core.dynamic_table import DynamicTableCollection
dt_list = root.databases['my_db'].schemas['my_schema'].dynamic_tables.iter(like='my%')
for dt_obj in dt_list:
print(dt_obj.name)
動的なテーブル操作の実行¶
動的テーブルのリフレッシュ、一時停止、再開など、動的テーブルの一般的な操作は DynamicTableResource
オブジェクトで実行できます。
動的テーブル・リソースでできる操作をいくつか示すために、次の例のコードでは次のような操作を行っています。
my_dynamic_table
動的テーブル・リソース・オブジェクトを取得します。動的テーブルをリフレッシュします。
動的テーブルを中断します。
動的テーブルを再開します。
動的テーブルをドロップします。
my_dynamic_table_res = root.databases["my_db"].schemas["my_schema"].dynamic_tables["my_dynamic_table"]
my_dynamic_table_res.refresh()
my_dynamic_table_res.suspend()
my_dynamic_table_res.resume()
my_dynamic_table_res.drop()