Verwalten von Snowflake-Streams mit Python¶
Sie können Python verwenden, um Snowflake Streams zu verwalten. Dabei handelt es sich um Objekte, die Änderungen an Tabellen, einschließlich Einfügungen, Aktualisierungen und Löschungen, sowie Metadaten zu jeder Änderung aufzeichnen (DML). Weitere Informationen dazu finden Sie unter Einführung in Streams.
Bemerkung
ALTER STREAM wird derzeit nicht unterstützt.
Die Snowflake Python APIs stellt Streams mit zwei verschiedenen Typen dar:
Stream
: Zeigt die Eigenschaften eines Streams an, z. B. seinen Namen, sein Ziel, sein Warehouse und seine Anweisung zur Abfrage.StreamResource
: Stellt Methoden zur Verfügung, mit denen Sie ein entsprechendesStream
-Objekt abrufen, den Stream anhalten und fortsetzen sowie den Stream löschen können.
Voraussetzungen¶
Die Beispiele in diesem Thema gehen davon aus, dass Sie Code hinzugefügt haben, um eine Verbindung zu Snowflake herzustellen und ein Root
-Objekt zu erstellen, von dem aus Sie die Snowflake Python APIs verwenden können.
Beispielsweise verwendet der folgende Code Verbindungsparameter, die in einer Konfigurationsdatei definiert sind, um eine Verbindung zu Snowflake zu erstellen:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Unter Verwendung des resultierenden Session
-Objekts erstellt der Code ein Root
-Objekt, das die Typen und Methoden der API verwendet. Weitere Informationen dazu finden Sie unter Verbinden mit Snowflake mit dem Snowflake Python APIs.
Einen Stream erstellen¶
Um einen Stream zu erstellen, erstellen Sie zunächst ein Stream
-Objekt und dann ein StreamCollection
-Objekt aus dem API Root
-Objekt. Verwenden Sie StreamCollection.create
, um den neuen Stream zu Snowflake hinzuzufügen.
Sie können einen Stream mit den folgenden Objekttypen erstellen:
Standardtabellen
Ansichten
Verzeichnistabellen
Bei einer Quelltabelle¶
Der Code im folgenden Beispiel erstellt ein Stream
-Objekt, das einen Stream namens my_stream_on_table
auf der Quelltabelle my_table
in der my_db
-Datenbank und dem my_schema
-Schema mit den angegebenen Stream-Eigenschaften darstellt:
Bemerkung
Der StreamSourceTable
-Typ unterstützt nur Standardtabellen. Andere Arten von Tabellen - wie dynamische Tabellen, Ereignistabellen, externe Tabellen und Iceberg-Tabellen - werden derzeit nicht unterstützt.
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable
stream_on_table = Stream(
"my_stream_on_table",
StreamSourceTable(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_table',
append_only = True,
show_initial_rows = False,
),
comment = 'create stream on table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
Der Code erstellt eine StreamCollection
-Variable streams
und verwendet StreamCollection.create
, um einen neuen Stream in Snowflake zu erstellen.
In einer Quellansicht¶
Der Code im folgenden Beispiel erstellt ein Stream
-Objekt, das einen Stream namens my_stream_on_view
in der Quellansicht my_view
in der my_db
-Datenbank und dem my_schema
-Schema mit den angegebenen Eigenschaften des Streams darstellt:
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView
stream_on_view = Stream(
"my_stream_on_view",
StreamSourceView(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_view',
),
comment = 'create stream on view'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
In einer Tabelle eines Quellverzeichnisses¶
Der Code im folgenden Beispiel erstellt ein Stream
-Objekt, das einen Stream namens my_stream_on_directory_table
in der my_directory_table
-Quellverzeichnis-Tabelle in der my_db
-Datenbank und dem my_schema
-Schema mit den angegebenen Stream-Eigenschaften darstellt:
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage
stream_on_directory_table = Stream(
"my_stream_on_directory_table",
StreamSourceStage(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_directory_table',
),
comment = 'create stream on directory table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
Klonen eines Streams¶
Der Code im folgenden Beispiel erstellt einen neuen Stream namens my_stream
mit derselben Definition wie der my_other_stream
-Quell-Stream in der my_db
-Datenbank und dem my_schema
-Schema:
from snowflake.core.stream import Stream
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
Stream-Details abrufen¶
Sie können Informationen über einen Stream erhalten, indem Sie die StreamResource.fetch
-Methode aufrufen, die ein Stream
-Objekt zurückgibt.
Der Code im folgenden Beispiel ruft Informationen über einen Stream namens my_stream
in der my_db
-Datenbank und dem my_schema
-Schema ab:
stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
Auflistung der Streams¶
Sie können Streams mit der StreamCollection.iter
-Methode auflisten, die einen PagedIter
-Iterator von Stream
-Objekten zurückgibt.
Der Code im folgenden Beispiel listet Streams auf, deren Name mit my
in der my_db
-Datenbank und dem my_schema
-Schema beginnt, und gibt dann den Namen jedes einzelnen aus:
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
print(stream_obj.name)
Der Code im folgenden Beispiel listet ebenfalls Streams auf, deren Name mit my
beginnt, verwendet aber den starts_with
-Parameter anstelle von like
. In diesem Beispiel wird auch der optionale show_limit=10
-Parameter eingestellt, um die Anzahl der Ergebnisse auf 10
zu beschränken:
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
print(stream_obj.name)
Löschen eines Streams¶
Sie können einen Stream mit einem StreamResource
-Objekt löschen.
Der Code im folgenden Beispiel holt das my_stream
-Stream-Ressourcen-Objekt ab und löscht den Stream dann.
my_stream_res = root.streams["my_stream"]
my_stream_res.drop()