Verwalten von dynamischen Snowflake-Tabellen mit Python

Sie können Python verwenden, um dynamische Snowflake-Tabellen zu verwalten, einen neuen Tabellentyp für kontinuierliche Verarbeitungspipelines. Dynamische Tabellen materialisieren die Ergebnisse einer bestimmten Abfrage. Einen Überblick über dieses Feature finden Sie unter Dynamische Tabellen.

Die Snowflake Python APIs stellt dynamische Tabellen mit zwei verschiedenen Typen dar:

  • DynamicTable: Zeigt die Eigenschaften einer dynamischen Tabelle an, z. B. ihren Namen, ihr Ziel, ihr Warehouse und ihre Anweisung zur Abfrage.

  • DynamicTableResource: Stellt Methoden zur Verfügung, mit denen Sie ein entsprechendes DynamicTable-Objekt abrufen, die dynamische Tabelle aussetzen und wieder aufnehmen und die dynamische Tabelle löschen können.

Voraussetzungen

Die Beispiele in diesem Thema gehen davon aus, dass Sie Code hinzugefügt haben, um eine Verbindung zu Snowflake herzustellen und ein Root-Objekt zu erstellen, von dem aus Sie die Snowflake Python APIs verwenden können.

Beispielsweise verwendet der folgende Code Verbindungsparameter, die in einer Konfigurationsdatei definiert sind, um eine Verbindung zu Snowflake zu erstellen:

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

Unter Verwendung des resultierenden Session-Objekts erstellt der Code ein Root-Objekt, das die Typen und Methoden der API verwendet. Weitere Informationen dazu finden Sie unter Verbinden mit Snowflake mit dem Snowflake Python APIs.

Erstellen einer dynamischen Tabelle

Um eine dynamische Tabelle zu erstellen, legen Sie zunächst ein DynamicTable-Objekt an und erstellen dann ein DynamicTableCollection-Objekt aus dem API Root-Objekt. Verwenden Sie DynamicTableCollection.create, um die neue dynamische Tabelle zu Snowflake hinzuzufügen.

Der Code im folgenden Beispiel erstellt ein DynamicTable-Objekt, das eine dynamische Tabelle mit dem Namen my_dynamic_table in der my_db-Datenbank und dem my_schema-Schema darstellt, wobei die minimal erforderlichen Optionen angegeben sind:

from snowflake.core.dynamic_table import DynamicTable, DownstreamLag

my_dt = DynamicTable(
  name='my_dynamic_table',
  target_lag=DownstreamLag(),
  warehouse='my_wh',
  query='SELECT * FROM t',
)
dynamic_tables = root.databases['my_db'].schemas['my_schema'].dynamic_tables
dynamic_tables.create(my_dt)
Copy

Der Code erstellt eine DynamicTableCollection-Variable dynamic_tables und verwendet DynamicTableCollection.create, um eine neue dynamische Tabelle in Snowflake zu erstellen.

Der Code im folgenden Beispiel erstellt ein DynamicTable-Objekt, das eine dynamische Tabelle namens my_dynamic_table2 in der my_db-Datenbank und das my_schema-Schema mit allen derzeit möglichen Optionen darstellt:

from snowflake.core.dynamic_table import DynamicTable, UserDefinedLag

root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
  DynamicTable(
      name='my_dynamic_table2',
      kind='PERMANENT',
      target_lag=UserDefinedLag(seconds=60),
      warehouse='my_wh',
      query='SELECT * FROM t',
      refresh_mode='FULL',
      initialize='ON_SCHEDULE',
      cluster_by=['id > 1'],
      comment='test table',
      data_retention_time_in_days=7,
      max_data_extension_time_in_days=7,
  )
)
Copy

Klonen einer dynamischen Tabelle

Der Code im folgenden Beispiel erstellt eine neue dynamische Tabelle namens my_dynamic_table2 mit denselben Spaltendefinitionen und allen vorhandenen Daten aus der dynamischen my_dynamic_table-Quelltabelle in der my_db-Datenbank und dem my_schema-Schema:

Bemerkung

Diese Operation des Klonens verwendet das DynamicTableClone-Objekt, das die optionalen Parameter target_lag und warehouse enthält, und unterstützt derzeit keine anderen Parameter.

from snowflake.core.dynamic_table import DynamicTableClone

root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
  DynamicTableClone(
      name='my_dynamic_table2',
      warehouse='my_wh2',
  ),
  clone_table='my_dynamic_table',
)
Copy

Weitere Informationen zu dieser Funktionalität finden Sie unter CREATE DYNAMIC TABLE … CLONE.

Abrufen dynamischr Tabellendetails

Sie können Informationen über eine dynamische Tabelle erhalten, indem Sie die DynamicTableResource.fetch-Methode aufrufen, die ein DynamicTable-Objekt zurückgibt.

Der Code im folgenden Beispiel ruft Informationen über eine dynamische Tabelle namens my_dynamic_table in der my_db-Datenbank und das my_schema-Schema ab:

dynamic_table = root.databases['my_db'].schemas['my_schema'].dynamic_tables['my_dynamic_table']
dt_details = dynamic_table.fetch()
print(dt_details.to_dict())
Copy

Auflistung von dynamischen Tabellen

Sie können dynamische Tabellen mit der DynamicTableCollection.iter-Methode auflisten, die einen PagedIter-Iterator von DynamicTable-Objekten zurückgibt.

Der Code im folgenden Beispiel listet dynamische Tabellen auf, deren Name mit dem Text my in der my_db-Datenbank und dem my_schema-Schema beginnt, und gibt dann den Namen jeder Tabelle aus:

from snowflake.core.dynamic_table import DynamicTableCollection

dt_list = root.databases['my_db'].schemas['my_schema'].dynamic_tables.iter(like='my%')
for dt_obj in dt_list:
  print(dt_obj.name)
Copy

Durchführen von Operationen in dynamischer Tabelle

Sie können gängige Operationen für dynamische Tabellen, wie z. B. das Aktualisieren, Aussetzen und Wiederaufnehmen einer dynamischen Tabelle, mit einem DynamicTableResource-Objekt durchführen.

Um einige Operationen zu demonstrieren, die Sie mit einer dynamischen Tabellenressource durchführen können, führt der Code im folgenden Beispiel Folgendes aus:

  1. Ruft das my_dynamic_table-Ressourcen-Objekt für dynamische Tabellen ab.

  2. Aktualisiert die dynamische Tabelle.

  3. Setzt die dynamische Tabelle aus.

  4. Setzt die dynamische Tabelle fort.

  5. Löscht die dynamische Tabelle.

my_dynamic_table_res = root.databases["my_db"].schemas["my_schema"].dynamic_tables["my_dynamic_table"]

my_dynamic_table_res.refresh()
my_dynamic_table_res.suspend()
my_dynamic_table_res.resume()
my_dynamic_table_res.drop()
Copy