Gestion des flux Snowflake avec Python

Vous pouvez utiliser Python pour gérer les flux Snowflake, qui sont des objets qui enregistrent les modifications de langage de manipulation de données (DML) apportées aux tables, y compris les insertions, les mises à jour et les suppressions, ainsi que les métadonnées sur chaque modification. Pour plus d’informations, voir Présentation des flux.

Note

ALTER STREAM n’est actuellement pas pris en charge.

Les Snowflake Python APIs représentent des flux avec deux types distincts :

  • Stream : expose les propriétés d’un flux telles que son nom, sa latence cible, son entrepôt et son instruction de requête.

  • StreamResource : expose les méthodes que vous pouvez utiliser pour extraire un objet Stream correspondant, suspendre et reprendre le flux, et supprimer le flux.

Conditions préalables

Les exemples de cette rubrique supposent que vous ayez ajouté le code nécessaire pour vous connecter à Snowflake et créer un objet Root à partir duquel utiliser les Snowflake Python APIs.

Par exemple, le code suivant utilise les paramètres de connexion définis dans un fichier de configuration pour créer une connexion à Snowflake.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

En utilisant l’objet Session obtenu, le code crée un objet Root pour utiliser les types et les méthodes de l’API. Pour plus d’informations, voir Connexion à Snowflake avec Snowflake Python APIs.

Création d’un flux

Pour créer un flux, il faut d’abord créer un objet Stream, puis créer un objet StreamCollection à partir de l’objet Root de l’API. En utilisant StreamCollection.create, ajoutez le nouveau flux à Snowflake.

Vous pouvez créer un flux sur les types d’objets suivants :

  • Tables standards

  • Vues

  • Tables de répertoire

Sur une table source

Le code de l’exemple suivant crée un objet Stream qui représente un flux nommé my_stream_on_table sur la table source my_table dans la base de données my_db et le schéma my_schema, avec les propriétés de flux spécifiées :

Note

Le type StreamSourceTable ne prend en charge que les tables standard. Les autres types de tables, telles que les tables dynamiques, les tables d’événements, les tables externes et les tables Iceberg, ne sont actuellement pas prises en charge.

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable

stream_on_table = Stream(
  "my_stream_on_table",
  StreamSourceTable(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_table',
      append_only = True,
      show_initial_rows = False,
  ),
  comment = 'create stream on table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
Copy

Ce code crée une variable StreamCollection streams et utilise StreamCollection.create pour créer un nouveau flux dans Snowflake.

Sur une vue source

Le code de l’exemple suivant crée un objet Stream qui représente un flux nommé my_stream_on_view sur la vue source my_view dans la base de données my_db et le schéma my_schema, avec les propriétés de flux spécifiées :

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView

stream_on_view = Stream(
  "my_stream_on_view",
  StreamSourceView(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_view',
  ),
  comment = 'create stream on view'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
Copy

Sur une table de répertoire source

Le code de l’exemple suivant crée un objet Stream qui représente un flux nommé my_stream_on_directory_table sur la table de répertoire source my_directory_table dans la base de données my_db et le schéma my_schema, avec les propriétés de flux spécifiées :

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage

stream_on_directory_table = Stream(
  "my_stream_on_directory_table",
  StreamSourceStage(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_directory_table',
  ),
  comment = 'create stream on directory table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
Copy

Clonage d’un flux

Le code de l’exemple suivant crée un nouveau flux nommé my_stream avec la même définition que le flux source my_other_stream dans la base de données my_db et le schéma my_schema :

from snowflake.core.stream import Stream

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
Copy

Obtention d’informations sur les flux

Vous pouvez obtenir des informations sur un flux en appelant la méthode StreamResource.fetch, qui renvoie un objet Stream.

Le code de l’exemple suivant obtient des informations sur un flux nommé my_stream dans la base de données my_db et le schéma my_schema :

stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
Copy

Affichage des flux

Vous pouvez répertorier des flux à l’aide de la méthode StreamCollection.iter qui renvoie un itérateur PagedIter d’objets Stream.

Le code de l’exemple suivant répertorie les flux dont le nom commence par my dans la base de données my_db et le schéma my_schema, puis imprime le nom de chacun :

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

Le code de l’exemple suivant répertorie également les flux dont le nom commence par my, mais il utilise le paramètre starts_with au lieu de like. Cet exemple définit également le paramètre facultatif show_limit=10 pour limiter le nombre de résultats à 10 :

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

Suppression d’un flux

Vous pouvez supprimer un flux avec un objet StreamResource.

Le code dans l’exemple suivant extrait l’objet de ressource de flux my_stream, puis supprime le flux.

my_stream_res = root.streams["my_stream"]
my_stream_res.drop()
Copy