Managing Snowflake streams with Python

You can use Python to manage Snowflake streams, which are objects that record data manipulation language (DML) changes made to tables, including inserts, updates, and deletes, as well as metadata about each change. For more information, see Introduction to Streams.

Note

ALTER STREAM is currently not supported.

The Snowflake Python APIs represents streams with two separate types:

  • Stream: Exposes a stream’s properties such as its name, target lag, warehouse, and query statement.

  • StreamResource: Exposes methods you can use to fetch a corresponding Stream object, suspend and resume the stream, and drop the stream.

Prerequisites

The examples in this topic assume that you’ve added code to connect with Snowflake and to create a Root object from which to use the Snowflake Python APIs.

For example, the following code uses connection parameters defined in a configuration file to create a connection to Snowflake:

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

Using the resulting Session object, the code creates a Root object to use the API’s types and methods. For more information, see Connect to Snowflake with the Snowflake Python APIs.

Creating a stream

To create a stream, first create a Stream object, and then create a StreamCollection object from the API Root object. Using StreamCollection.create, add the new stream to Snowflake.

You can create a stream on the following object types:

  • Standard tables

  • Views

  • Directory tables

On a source table

Code in the following example creates a Stream object that represents a stream named my_stream_on_table on the source table my_table in the my_db database and the my_schema schema, with the specified stream properties:

Note

The StreamSourceTable type only supports standard tables. Other types of tables—such as dynamic tables, event tables, external tables, and Iceberg tables—are currently not supported.

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable

stream_on_table = Stream(
  "my_stream_on_table",
  StreamSourceTable(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_table',
      append_only = True,
      show_initial_rows = False,
  ),
  comment = 'create stream on table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
Copy

The code creates a StreamCollection variable streams and uses StreamCollection.create to create a new stream in Snowflake.

On a source view

Code in the following example creates a Stream object that represents a stream named my_stream_on_view on the source view my_view in the my_db database and the my_schema schema, with the specified stream properties:

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView

stream_on_view = Stream(
  "my_stream_on_view",
  StreamSourceView(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_view',
  ),
  comment = 'create stream on view'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
Copy

On a source directory table

Code in the following example creates a Stream object that represents a stream named my_stream_on_directory_table on the source directory table my_directory_table in the my_db database and the my_schema schema, with the specified stream properties:

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage

stream_on_directory_table = Stream(
  "my_stream_on_directory_table",
  StreamSourceStage(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_directory_table',
  ),
  comment = 'create stream on directory table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
Copy

Cloning a stream

Code in the following example creates a new stream named my_stream with the same definition as the source stream my_other_stream in the my_db database and the my_schema schema:

from snowflake.core.stream import Stream

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
Copy

Getting stream details

You can get information about a stream by calling the StreamResource.fetch method, which returns a Stream object.

Code in the following example gets information about a stream named my_stream in the my_db database and the my_schema schema:

stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
Copy

Listing streams

You can list streams using the StreamCollection.iter method, which returns a PagedIter iterator of Stream objects.

Code in the following example lists streams whose name starts with my in the my_db database and the my_schema schema, and then prints the name of each:

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

Code in the following example also lists streams whose name begins with my, but it uses the starts_with parameter instead of like. This example also sets the optional parameter show_limit=10 to limit the number of results to 10:

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

Dropping a stream

You can drop a stream with a StreamResource object.

Code in the following example fetches the my_stream stream resource object and then drops the stream.

my_stream_res = root.streams["my_stream"]
my_stream_res.drop()
Copy