Gestion des flux Snowflake avec Python¶
Vous pouvez utiliser Python pour gérer les flux Snowflake, qui sont des objets qui enregistrent les modifications de langage de manipulation de données (DML) apportées aux tables, y compris les insertions, les mises à jour et les suppressions, ainsi que les métadonnées sur chaque modification. Pour plus d’informations, voir Présentation des flux.
Note
ALTER STREAM n’est actuellement pas pris en charge.
Les Snowflake Python APIs représentent des flux avec deux types distincts :
Stream
: expose les propriétés d’un flux telles que son nom, sa latence cible, son entrepôt et son instruction de requête.StreamResource
: expose les méthodes que vous pouvez utiliser pour extraire un objetStream
correspondant, suspendre et reprendre le flux, et supprimer le flux.
Conditions préalables¶
Les exemples de cette rubrique supposent que vous ayez ajouté le code nécessaire pour vous connecter à Snowflake et créer un objet Root
à partir duquel utiliser les Snowflake Python APIs.
Par exemple, le code suivant utilise les paramètres de connexion définis dans un fichier de configuration pour créer une connexion à Snowflake.
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
En utilisant l’objet Session
obtenu, le code crée un objet Root
pour utiliser les types et les méthodes de l’API. Pour plus d’informations, voir Connexion à Snowflake avec Snowflake Python APIs.
Création d’un flux¶
Pour créer un flux, il faut d’abord créer un objet Stream
, puis créer un objet StreamCollection
à partir de l’objet Root
de l’API. En utilisant StreamCollection.create
, ajoutez le nouveau flux à Snowflake.
Vous pouvez créer un flux sur les types d’objets suivants :
Tables standards
Vues
Tables de répertoire
Sur une table source¶
Le code de l’exemple suivant crée un objet Stream
qui représente un flux nommé my_stream_on_table
sur la table source my_table
dans la base de données my_db
et le schéma my_schema
, avec les propriétés de flux spécifiées :
Note
Le type StreamSourceTable
ne prend en charge que les tables standard. Les autres types de tables, telles que les tables dynamiques, les tables d’événements, les tables externes et les tables Iceberg, ne sont actuellement pas prises en charge.
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable
stream_on_table = Stream(
"my_stream_on_table",
StreamSourceTable(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_table',
append_only = True,
show_initial_rows = False,
),
comment = 'create stream on table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
Ce code crée une variable StreamCollection
streams
et utilise StreamCollection.create
pour créer un nouveau flux dans Snowflake.
Sur une vue source¶
Le code de l’exemple suivant crée un objet Stream
qui représente un flux nommé my_stream_on_view
sur la vue source my_view
dans la base de données my_db
et le schéma my_schema
, avec les propriétés de flux spécifiées :
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView
stream_on_view = Stream(
"my_stream_on_view",
StreamSourceView(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_view',
),
comment = 'create stream on view'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
Sur une table de répertoire source¶
Le code de l’exemple suivant crée un objet Stream
qui représente un flux nommé my_stream_on_directory_table
sur la table de répertoire source my_directory_table
dans la base de données my_db
et le schéma my_schema
, avec les propriétés de flux spécifiées :
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage
stream_on_directory_table = Stream(
"my_stream_on_directory_table",
StreamSourceStage(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_directory_table',
),
comment = 'create stream on directory table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
Clonage d’un flux¶
Le code de l’exemple suivant crée un nouveau flux nommé my_stream
avec la même définition que le flux source my_other_stream
dans la base de données my_db
et le schéma my_schema
:
from snowflake.core.stream import Stream
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
Obtention d’informations sur les flux¶
Vous pouvez obtenir des informations sur un flux en appelant la méthode StreamResource.fetch
, qui renvoie un objet Stream
.
Le code de l’exemple suivant obtient des informations sur un flux nommé my_stream
dans la base de données my_db
et le schéma my_schema
:
stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
Affichage des flux¶
Vous pouvez répertorier des flux à l’aide de la méthode StreamCollection.iter
qui renvoie un itérateur PagedIter
d’objets Stream
.
Le code de l’exemple suivant répertorie les flux dont le nom commence par my
dans la base de données my_db
et le schéma my_schema
, puis imprime le nom de chacun :
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
print(stream_obj.name)
Le code de l’exemple suivant répertorie également les flux dont le nom commence par my
, mais il utilise le paramètre starts_with
au lieu de like
. Cet exemple définit également le paramètre facultatif show_limit=10
pour limiter le nombre de résultats à 10
:
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
print(stream_obj.name)
Suppression d’un flux¶
Vous pouvez supprimer un flux avec un objet StreamResource
.
Le code dans l’exemple suivant extrait l’objet de ressource de flux my_stream
, puis supprime le flux.
my_stream_res = root.streams["my_stream"]
my_stream_res.drop()