Verwalten von Snowflake-Streams mit Python

Sie können Python verwenden, um Snowflake Streams zu verwalten. Dabei handelt es sich um Objekte, die Änderungen an Tabellen, einschließlich Einfügungen, Aktualisierungen und Löschungen, sowie Metadaten zu jeder Änderung aufzeichnen (DML). Weitere Informationen dazu finden Sie unter Einführung in Streams.

Bemerkung

ALTER STREAM wird derzeit nicht unterstützt.

Die Snowflake Python APIs stellt Streams mit zwei verschiedenen Typen dar:

  • Stream: Zeigt die Eigenschaften eines Streams an, z. B. seinen Namen, sein Ziel, sein Warehouse und seine Anweisung zur Abfrage.

  • StreamResource: Stellt Methoden zur Verfügung, mit denen Sie ein entsprechendes Stream-Objekt abrufen, den Stream anhalten und fortsetzen sowie den Stream löschen können.

Voraussetzungen

Die Beispiele in diesem Thema gehen davon aus, dass Sie Code hinzugefügt haben, um eine Verbindung zu Snowflake herzustellen und ein Root-Objekt zu erstellen, von dem aus Sie die Snowflake Python APIs verwenden können.

Beispielsweise verwendet der folgende Code Verbindungsparameter, die in einer Konfigurationsdatei definiert sind, um eine Verbindung zu Snowflake zu erstellen:

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

Unter Verwendung des resultierenden Session-Objekts erstellt der Code ein Root-Objekt, das die Typen und Methoden der API verwendet. Weitere Informationen dazu finden Sie unter Verbinden mit Snowflake mit dem Snowflake Python APIs.

Einen Stream erstellen

Um einen Stream zu erstellen, erstellen Sie zunächst ein Stream-Objekt und dann ein StreamCollection-Objekt aus dem API Root-Objekt. Verwenden Sie StreamCollection.create, um den neuen Stream zu Snowflake hinzuzufügen.

Sie können einen Stream mit den folgenden Objekttypen erstellen:

  • Standardtabellen

  • Ansichten

  • Verzeichnistabellen

Bei einer Quelltabelle

Der Code im folgenden Beispiel erstellt ein Stream-Objekt, das einen Stream namens my_stream_on_table auf der Quelltabelle my_table in der my_db-Datenbank und dem my_schema-Schema mit den angegebenen Stream-Eigenschaften darstellt:

Bemerkung

Der StreamSourceTable-Typ unterstützt nur Standardtabellen. Andere Arten von Tabellen - wie dynamische Tabellen, Ereignistabellen, externe Tabellen und Iceberg-Tabellen - werden derzeit nicht unterstützt.

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable

stream_on_table = Stream(
  "my_stream_on_table",
  StreamSourceTable(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_table',
      append_only = True,
      show_initial_rows = False,
  ),
  comment = 'create stream on table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
Copy

Der Code erstellt eine StreamCollection-Variable streams und verwendet StreamCollection.create, um einen neuen Stream in Snowflake zu erstellen.

In einer Quellansicht

Der Code im folgenden Beispiel erstellt ein Stream-Objekt, das einen Stream namens my_stream_on_view in der Quellansicht my_view in der my_db-Datenbank und dem my_schema-Schema mit den angegebenen Eigenschaften des Streams darstellt:

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView

stream_on_view = Stream(
  "my_stream_on_view",
  StreamSourceView(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_view',
  ),
  comment = 'create stream on view'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
Copy

In einer Tabelle eines Quellverzeichnisses

Der Code im folgenden Beispiel erstellt ein Stream-Objekt, das einen Stream namens my_stream_on_directory_table in der my_directory_table-Quellverzeichnis-Tabelle in der my_db-Datenbank und dem my_schema-Schema mit den angegebenen Stream-Eigenschaften darstellt:

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage

stream_on_directory_table = Stream(
  "my_stream_on_directory_table",
  StreamSourceStage(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_directory_table',
  ),
  comment = 'create stream on directory table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
Copy

Klonen eines Streams

Der Code im folgenden Beispiel erstellt einen neuen Stream namens my_stream mit derselben Definition wie der my_other_stream-Quell-Stream in der my_db-Datenbank und dem my_schema-Schema:

from snowflake.core.stream import Stream

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
Copy

Stream-Details abrufen

Sie können Informationen über einen Stream erhalten, indem Sie die StreamResource.fetch-Methode aufrufen, die ein Stream-Objekt zurückgibt.

Der Code im folgenden Beispiel ruft Informationen über einen Stream namens my_stream in der my_db-Datenbank und dem my_schema-Schema ab:

stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
Copy

Auflistung der Streams

Sie können Streams mit der StreamCollection.iter-Methode auflisten, die einen PagedIter-Iterator von Stream-Objekten zurückgibt.

Der Code im folgenden Beispiel listet Streams auf, deren Name mit my in der my_db-Datenbank und dem my_schema-Schema beginnt, und gibt dann den Namen jedes einzelnen aus:

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

Der Code im folgenden Beispiel listet ebenfalls Streams auf, deren Name mit my beginnt, verwendet aber den starts_with-Parameter anstelle von like. In diesem Beispiel wird auch der optionale show_limit=10-Parameter eingestellt, um die Anzahl der Ergebnisse auf 10 zu beschränken:

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

Löschen eines Streams

Sie können einen Stream mit einem StreamResource-Objekt löschen.

Der Code im folgenden Beispiel holt das my_stream-Stream-Ressourcen-Objekt ab und löscht den Stream dann.

my_stream_res = root.streams["my_stream"]
my_stream_res.drop()
Copy