Gestion des tables dynamiques Snowflake avec Python

Vous pouvez utiliser Python pour gérer les tables dynamiques Snowflake, qui sont un nouveau type de table pour les pipelines de traitement continu. Les tables dynamiques matérialisent les résultats d’une requête spécifique. Pour un aperçu de cette fonctionnalité, voir Tables dynamiques.

Les Snowflake Python APIs représentent des tables dynamiques avec deux types distincts :

  • DynamicTable : expose les propriétés d’une table dynamique telles que son nom, sa latence cible, son entrepôt et son instruction de requête.

  • DynamicTableResource : expose les méthodes que vous pouvez utiliser pour extraire un objet DynamicTable correspondant, suspendre et reprendre la table dynamique, et supprimer la table dynamique.

Conditions préalables

Les exemples de cette rubrique supposent que vous ayez ajouté le code nécessaire pour vous connecter à Snowflake et créer un objet Root à partir duquel utiliser les Snowflake Python APIs.

Par exemple, le code suivant utilise les paramètres de connexion définis dans un fichier de configuration pour créer une connexion à Snowflake.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

En utilisant l’objet Session obtenu, le code crée un objet Root pour utiliser les types et les méthodes de l’API. Pour plus d’informations, voir Connexion à Snowflake avec Snowflake Python APIs.

Création d’une table dynamique

Pour créer une table dynamique, il faut d’abord créer un objet DynamicTable, puis créer un objet DynamicTableCollection à partir de l’objet Root de l’API. En utilisant DynamicTableCollection.create, ajoutez la nouvelle table dynamique à Snowflake.

Le code dans l’exemple suivant crée un objet DynamicTable qui représente une table dynamique nommée my_dynamic_table dans la base de données my_db et le schéma my_schema, avec les options minimales requises spécifiées :

from snowflake.core.dynamic_table import DynamicTable, DownstreamLag

my_dt = DynamicTable(
  name='my_dynamic_table',
  target_lag=DownstreamLag(),
  warehouse='my_wh',
  query='SELECT * FROM t',
)
dynamic_tables = root.databases['my_db'].schemas['my_schema'].dynamic_tables
dynamic_tables.create(my_dt)
Copy

Ce code crée une variable DynamicTableCollection dynamic_tables et utilise DynamicTableCollection.create pour créer une nouvelle table dynamique dans Snowflake.

Le code de l’exemple suivant crée un objet DynamicTable qui représente une table dynamique nommée my_dynamic_table2 dans la base de données my_db et le schéma my_schema avec toutes les options actuellement possibles spécifiées :

from snowflake.core.dynamic_table import DynamicTable, UserDefinedLag

root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
  DynamicTable(
      name='my_dynamic_table2',
      kind='PERMANENT',
      target_lag=UserDefinedLag(seconds=60),
      warehouse='my_wh',
      query='SELECT * FROM t',
      refresh_mode='FULL',
      initialize='ON_SCHEDULE',
      cluster_by=['id > 1'],
      comment='test table',
      data_retention_time_in_days=7,
      max_data_extension_time_in_days=7,
  )
)
Copy

Clonage d’une table dynamique

Le code de l’exemple suivant crée une nouvelle table dynamique nommée my_dynamic_table2 avec les mêmes définitions de colonnes et toutes les données existantes de la table dynamique source my_dynamic_table dans la base de données my_db et le schéma my_schema :

Note

Cette opération de clonage utilise l’objet DynamicTableClone, qui inclut l’optionnel target_lag et les paramètres warehouse et ne prend actuellement pas en charge d’autres paramètres.

from snowflake.core.dynamic_table import DynamicTableClone

root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
  DynamicTableClone(
      name='my_dynamic_table2',
      warehouse='my_wh2',
  ),
  clone_table='my_dynamic_table',
)
Copy

Pour plus d’informations sur cette fonctionnalité, voir CREATE DYNAMIC TABLE … CLONE.

Obtenir les détails d’une table dynamique

Vous pouvez obtenir des informations sur une table dynamique en appelant la méthode DynamicTableResource.fetch, qui renvoie un objet DynamicTable.

Le code de l’exemple suivant obtient des informations sur une table dynamique nommée my_dynamic_table dans la base de données my_db et le schéma my_schema :

dynamic_table = root.databases['my_db'].schemas['my_schema'].dynamic_tables['my_dynamic_table']
dt_details = dynamic_table.fetch()
print(dt_details.to_dict())
Copy

Affichage de tables dynamiques

Vous pouvez répertorier des tables dynamiques à l’aide de la méthode DynamicTableCollection.iter qui renvoie un itérateur PagedIter d’objets DynamicTable.

Le code de l’exemple suivant répertorie les tables dynamiques dont le nom commence par le texte my dans la base de données my_db et le schéma my_schema, puis imprime le nom de chacun :

from snowflake.core.dynamic_table import DynamicTableCollection

dt_list = root.databases['my_db'].schemas['my_schema'].dynamic_tables.iter(like='my%')
for dt_obj in dt_list:
  print(dt_obj.name)
Copy

Exécution d’opérations de table dynamiques

Vous pouvez effectuer des opérations de table dynamiques courantes, telles que l’actualisation, la suspension et la reprise d’une table dynamique, avec un objet DynamicTableResource.

Pour illustrer certaines opérations que vous pouvez effectuer avec une ressource de table dynamique, le code de l’exemple suivant effectue les opérations suivantes :

  1. Extrait l’objet de ressource de table dynamique my_dynamic_table.

  2. Actualise la table dynamique.

  3. Suspend la table dynamique.

  4. Reprend la table dynamique.

  5. Supprime la table dynamique.

my_dynamic_table_res = root.databases["my_db"].schemas["my_schema"].dynamic_tables["my_dynamic_table"]

my_dynamic_table_res.refresh()
my_dynamic_table_res.suspend()
my_dynamic_table_res.resume()
my_dynamic_table_res.drop()
Copy