Verwalten von dynamischen Snowflake-Tabellen mit Python¶
Sie können Python verwenden, um dynamische Snowflake-Tabellen zu verwalten, einen neuen Tabellentyp für kontinuierliche Verarbeitungspipelines. Dynamische Tabellen materialisieren die Ergebnisse einer bestimmten Abfrage. Einen Überblick über dieses Feature finden Sie unter Dynamische Tabellen.
Die Snowflake Python APIs stellt dynamische Tabellen mit zwei verschiedenen Typen dar:
DynamicTable
: Zeigt die Eigenschaften einer dynamischen Tabelle an, z. B. ihren Namen, ihr Ziel, ihr Warehouse und ihre Anweisung zur Abfrage.DynamicTableResource
: Stellt Methoden zur Verfügung, mit denen Sie ein entsprechendesDynamicTable
-Objekt abrufen, die dynamische Tabelle aussetzen und wieder aufnehmen und die dynamische Tabelle löschen können.
Voraussetzungen¶
Die Beispiele in diesem Thema gehen davon aus, dass Sie Code hinzugefügt haben, um eine Verbindung zu Snowflake herzustellen und ein Root
-Objekt zu erstellen, von dem aus Sie die Snowflake Python APIs verwenden können.
Beispielsweise verwendet der folgende Code Verbindungsparameter, die in einer Konfigurationsdatei definiert sind, um eine Verbindung zu Snowflake zu erstellen:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Unter Verwendung des resultierenden Session
-Objekts erstellt der Code ein Root
-Objekt, das die Typen und Methoden der API verwendet. Weitere Informationen dazu finden Sie unter Verbinden mit Snowflake mit dem Snowflake Python APIs.
Erstellen einer dynamischen Tabelle¶
Um eine dynamische Tabelle zu erstellen, legen Sie zunächst ein DynamicTable
-Objekt an und erstellen dann ein DynamicTableCollection
-Objekt aus dem API Root
-Objekt. Verwenden Sie DynamicTableCollection.create
, um die neue dynamische Tabelle zu Snowflake hinzuzufügen.
Der Code im folgenden Beispiel erstellt ein DynamicTable
-Objekt, das eine dynamische Tabelle mit dem Namen my_dynamic_table
in der my_db
-Datenbank und dem my_schema
-Schema darstellt, wobei die minimal erforderlichen Optionen angegeben sind:
from snowflake.core.dynamic_table import DynamicTable, DownstreamLag
my_dt = DynamicTable(
name='my_dynamic_table',
target_lag=DownstreamLag(),
warehouse='my_wh',
query='SELECT * FROM t',
)
dynamic_tables = root.databases['my_db'].schemas['my_schema'].dynamic_tables
dynamic_tables.create(my_dt)
Der Code erstellt eine DynamicTableCollection
-Variable dynamic_tables
und verwendet DynamicTableCollection.create
, um eine neue dynamische Tabelle in Snowflake zu erstellen.
Der Code im folgenden Beispiel erstellt ein DynamicTable
-Objekt, das eine dynamische Tabelle namens my_dynamic_table2
in der my_db
-Datenbank und das my_schema
-Schema mit allen derzeit möglichen Optionen darstellt:
from snowflake.core.dynamic_table import DynamicTable, UserDefinedLag
root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
DynamicTable(
name='my_dynamic_table2',
kind='PERMANENT',
target_lag=UserDefinedLag(seconds=60),
warehouse='my_wh',
query='SELECT * FROM t',
refresh_mode='FULL',
initialize='ON_SCHEDULE',
cluster_by=['id > 1'],
comment='test table',
data_retention_time_in_days=7,
max_data_extension_time_in_days=7,
)
)
Klonen einer dynamischen Tabelle¶
Der Code im folgenden Beispiel erstellt eine neue dynamische Tabelle namens my_dynamic_table2
mit denselben Spaltendefinitionen und allen vorhandenen Daten aus der dynamischen my_dynamic_table
-Quelltabelle in der my_db
-Datenbank und dem my_schema
-Schema:
Bemerkung
Diese Operation des Klonens verwendet das
DynamicTableClone
-Objekt, das die optionalen Parametertarget_lag
undwarehouse
enthält, und unterstützt derzeit keine anderen Parameter.
from snowflake.core.dynamic_table import DynamicTableClone
root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
DynamicTableClone(
name='my_dynamic_table2',
warehouse='my_wh2',
),
clone_table='my_dynamic_table',
)
Weitere Informationen zu dieser Funktionalität finden Sie unter CREATE DYNAMIC TABLE … CLONE.
Abrufen dynamischr Tabellendetails¶
Sie können Informationen über eine dynamische Tabelle erhalten, indem Sie die DynamicTableResource.fetch
-Methode aufrufen, die ein DynamicTable
-Objekt zurückgibt.
Der Code im folgenden Beispiel ruft Informationen über eine dynamische Tabelle namens my_dynamic_table
in der my_db
-Datenbank und das my_schema
-Schema ab:
dynamic_table = root.databases['my_db'].schemas['my_schema'].dynamic_tables['my_dynamic_table']
dt_details = dynamic_table.fetch()
print(dt_details.to_dict())
Auflistung von dynamischen Tabellen¶
Sie können dynamische Tabellen mit der DynamicTableCollection.iter
-Methode auflisten, die einen PagedIter
-Iterator von DynamicTable
-Objekten zurückgibt.
Der Code im folgenden Beispiel listet dynamische Tabellen auf, deren Name mit dem Text my
in der my_db
-Datenbank und dem my_schema
-Schema beginnt, und gibt dann den Namen jeder Tabelle aus:
from snowflake.core.dynamic_table import DynamicTableCollection
dt_list = root.databases['my_db'].schemas['my_schema'].dynamic_tables.iter(like='my%')
for dt_obj in dt_list:
print(dt_obj.name)
Durchführen von Operationen in dynamischer Tabelle¶
Sie können gängige Operationen für dynamische Tabellen, wie z. B. das Aktualisieren, Aussetzen und Wiederaufnehmen einer dynamischen Tabelle, mit einem DynamicTableResource
-Objekt durchführen.
Um einige Operationen zu demonstrieren, die Sie mit einer dynamischen Tabellenressource durchführen können, führt der Code im folgenden Beispiel Folgendes aus:
Ruft das
my_dynamic_table
-Ressourcen-Objekt für dynamische Tabellen ab.Aktualisiert die dynamische Tabelle.
Setzt die dynamische Tabelle aus.
Setzt die dynamische Tabelle fort.
Löscht die dynamische Tabelle.
my_dynamic_table_res = root.databases["my_db"].schemas["my_schema"].dynamic_tables["my_dynamic_table"]
my_dynamic_table_res.refresh()
my_dynamic_table_res.suspend()
my_dynamic_table_res.resume()
my_dynamic_table_res.drop()