Gestion des tables dynamiques Snowflake avec Python¶
Vous pouvez utiliser Python pour gérer les tables dynamiques Snowflake, qui sont un nouveau type de table pour les pipelines de traitement continu. Les tables dynamiques matérialisent les résultats d’une requête spécifique. Pour un aperçu de cette fonctionnalité, voir Tables dynamiques.
Les Snowflake Python APIs représentent des tables dynamiques avec deux types distincts :
DynamicTable
: expose les propriétés d’une table dynamique telles que son nom, sa latence cible, son entrepôt et son instruction de requête.DynamicTableResource
: expose les méthodes que vous pouvez utiliser pour extraire un objetDynamicTable
correspondant, suspendre et reprendre la table dynamique, et supprimer la table dynamique.
Conditions préalables¶
Les exemples de cette rubrique supposent que vous ayez ajouté le code nécessaire pour vous connecter à Snowflake et créer un objet Root
à partir duquel utiliser les Snowflake Python APIs.
Par exemple, le code suivant utilise les paramètres de connexion définis dans un fichier de configuration pour créer une connexion à Snowflake.
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
En utilisant l’objet Session
obtenu, le code crée un objet Root
pour utiliser les types et les méthodes de l’API. Pour plus d’informations, voir Connexion à Snowflake avec Snowflake Python APIs.
Création d’une table dynamique¶
Pour créer une table dynamique, il faut d’abord créer un objet DynamicTable
, puis créer un objet DynamicTableCollection
à partir de l’objet Root
de l’API. En utilisant DynamicTableCollection.create
, ajoutez la nouvelle table dynamique à Snowflake.
Le code dans l’exemple suivant crée un objet DynamicTable
qui représente une table dynamique nommée my_dynamic_table
dans la base de données my_db
et le schéma my_schema
, avec les options minimales requises spécifiées :
from snowflake.core.dynamic_table import DynamicTable, DownstreamLag
my_dt = DynamicTable(
name='my_dynamic_table',
target_lag=DownstreamLag(),
warehouse='my_wh',
query='SELECT * FROM t',
)
dynamic_tables = root.databases['my_db'].schemas['my_schema'].dynamic_tables
dynamic_tables.create(my_dt)
Ce code crée une variable DynamicTableCollection
dynamic_tables
et utilise DynamicTableCollection.create
pour créer une nouvelle table dynamique dans Snowflake.
Le code de l’exemple suivant crée un objet DynamicTable
qui représente une table dynamique nommée my_dynamic_table2
dans la base de données my_db
et le schéma my_schema
avec toutes les options actuellement possibles spécifiées :
from snowflake.core.dynamic_table import DynamicTable, UserDefinedLag
root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
DynamicTable(
name='my_dynamic_table2',
kind='PERMANENT',
target_lag=UserDefinedLag(seconds=60),
warehouse='my_wh',
query='SELECT * FROM t',
refresh_mode='FULL',
initialize='ON_SCHEDULE',
cluster_by=['id > 1'],
comment='test table',
data_retention_time_in_days=7,
max_data_extension_time_in_days=7,
)
)
Clonage d’une table dynamique¶
Le code de l’exemple suivant crée une nouvelle table dynamique nommée my_dynamic_table2
avec les mêmes définitions de colonnes et toutes les données existantes de la table dynamique source my_dynamic_table
dans la base de données my_db
et le schéma my_schema
:
Note
Cette opération de clonage utilise l’objet
DynamicTableClone
, qui inclut l’optionneltarget_lag
et les paramètreswarehouse
et ne prend actuellement pas en charge d’autres paramètres.
from snowflake.core.dynamic_table import DynamicTableClone
root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
DynamicTableClone(
name='my_dynamic_table2',
warehouse='my_wh2',
),
clone_table='my_dynamic_table',
)
Pour plus d’informations sur cette fonctionnalité, voir CREATE DYNAMIC TABLE … CLONE.
Obtenir les détails d’une table dynamique¶
Vous pouvez obtenir des informations sur une table dynamique en appelant la méthode DynamicTableResource.fetch
, qui renvoie un objet DynamicTable
.
Le code de l’exemple suivant obtient des informations sur une table dynamique nommée my_dynamic_table
dans la base de données my_db
et le schéma my_schema
:
dynamic_table = root.databases['my_db'].schemas['my_schema'].dynamic_tables['my_dynamic_table']
dt_details = dynamic_table.fetch()
print(dt_details.to_dict())
Affichage de tables dynamiques¶
Vous pouvez répertorier des tables dynamiques à l’aide de la méthode DynamicTableCollection.iter
qui renvoie un itérateur PagedIter
d’objets DynamicTable
.
Le code de l’exemple suivant répertorie les tables dynamiques dont le nom commence par le texte my
dans la base de données my_db
et le schéma my_schema
, puis imprime le nom de chacun :
from snowflake.core.dynamic_table import DynamicTableCollection
dt_list = root.databases['my_db'].schemas['my_schema'].dynamic_tables.iter(like='my%')
for dt_obj in dt_list:
print(dt_obj.name)
Exécution d’opérations de table dynamiques¶
Vous pouvez effectuer des opérations de table dynamiques courantes, telles que l’actualisation, la suspension et la reprise d’une table dynamique, avec un objet DynamicTableResource
.
Pour illustrer certaines opérations que vous pouvez effectuer avec une ressource de table dynamique, le code de l’exemple suivant effectue les opérations suivantes :
Extrait l’objet de ressource de table dynamique
my_dynamic_table
.Actualise la table dynamique.
Suspend la table dynamique.
Reprend la table dynamique.
Supprime la table dynamique.
my_dynamic_table_res = root.databases["my_db"].schemas["my_schema"].dynamic_tables["my_dynamic_table"]
my_dynamic_table_res.refresh()
my_dynamic_table_res.suspend()
my_dynamic_table_res.resume()
my_dynamic_table_res.drop()