snowflake.core.stream.StreamCollection¶

class snowflake.core.stream.StreamCollection(schema: SchemaResource)¶

Bases: StreamCollectionBase

Represents the collection operations on the Snowflake Stream resource.

With this collection, you can create, iterate through, and fetch streams that you have access to in the current context.

Examples

Creating a stream instance:

>>> streams = root.databases["my_db"].schemas["my_schema"].streams
>>> streams.create(
...     Stream(
...         name="my_stream",
...         stream_source=StreamSourceTable(name="my_table", append_only=True, show_initial_rows=False),
...     ),
...     mode=CreateMode.error_if_exists,
... )
Copy

Attributes

database¶

The DatabaseResource this collection belongs to.

root¶

The Root object this collection belongs to.

Methods

create(stream: str, *, clone_stream: str | Clone, mode: CreateMode = CreateMode.error_if_exists, copy_grants: bool | None = False) → StreamResource¶
create(stream: Stream, *, mode: CreateMode = CreateMode.error_if_exists, copy_grants: bool | None = False) → StreamResource

Create a stream in Snowflake.

There are two ways to create a stream: by cloning or by building from scratch.

Cloning an existing stream

Parameters:
  • stream (str) – The new stream’s name

  • clone_stream (str or Clone object) – The name of stream to be cloned

  • mode (CreateMode, optional) –

    One of the following enum values:

    CreateMode.error_if_exists: Throw an snowflake.core.exceptions.ConflictError if the stream already exists in Snowflake. Equivalent to SQL create stream <name> ....

    CreateMode.or_replace: Replace if the stream already exists in Snowflake. Equivalent to SQL create or replace stream <name> ....

    CreateMode.if_not_exists: Do nothing if the stream already exists in Snowflake. Equivalent to SQL create stream <name> if not exists...

    Default is CreateMode.error_if_exists.

  • copy_grants (bool, optional) – Whether to enable copy grants when creating the object. Default is False.

Examples

Cloning a Stream instance:

>>> streams = schema.streams
>>> streams.create(
...     "new_stream_name",
...     clone_stream="stream_name_to_be_cloned",
...     mode=CreateMode.if_not_exists,
...     copy_grants=True,
... )
Copy

Cloning a Stream instance in a different database and schema

>>> streams = schema.streams
>>> streams.create(
...     "new_stream_name",
...     clone_stream="stream_database_name.stream_schema_name.stream_name_to_be_cloned",
...     mode=CreateMode.if_not_exists,
...     copy_grants=True,
... )
Copy

Creating a stream from scratch

Parameters:
  • stream (Stream) – The details of Stream object, together with Stream’s properties: name; comment is optional stream_source: StreamSource object, one of: StreamSourceStage, StreamSourceTable, StreamSourceView.

  • mode (CreateMode, optional) –

    One of the following enum values:

    CreateMode.error_if_exists: Throw an snowflake.core.exceptions.ConflictError if the stream already exists in Snowflake. Equivalent to SQL create stream <name> ....

    CreateMode.or_replace: Replace if the stream already exists in Snowflake. Equivalent to SQL create or replace stream <name> ....

    CreateMode.if_not_exists: Do nothing if the stream already exists in Snowflake. Equivalent to SQL create stream <name> if not exists...

    Default is CreateMode.error_if_exists.

  • copy_grants (bool, optional) – Whether to enable copy grants when creating the object. Default is False.

Examples

Creating a stream instance by source table:

>>> streams.create(
...     Stream(
...         name = "new_stream_name",
...         stream_source = StreamSourceTable(
...             point_of_time = PointOfTimeOffset(reference="before", offset="1"),
...             name = "my_source_table_name"
...             append_only = True,
...             show_initial_rows = False,
...             comment = "create stream by table"
...         )
...     ),
...     mode=CreateMode.if_not_exists,
...     copy_grants=True
... )
Copy

Creating a stream instance by source view:

>>> streams.create(
...     Stream(
...         name="new_stream_name",
...         stream_source=StreamSourceView(
...             point_of_time=PointOfTimeOffset(reference="before", offset="1"),
...             name="my_source_view_name",
...         ),
...     ),
...     mode=CreateMode.if_not_exists,
...     copy_grants=True,
... )
Copy

Creating a stream instance by source directory table:

>>> streams.create(
...     Stream(
...         name="new_stream_name",
...         stream_source=StreamSourceStage(
...             point_of_time=PointOfTimeOffset(reference="before", offset="1"),
...             name="my_source_directory_table_name",
...         ),
...     ),
...     mode=CreateMode.if_not_exists,
...     copy_grants=True,
... )
Copy
create_async(stream: str, *, clone_stream: str | Clone, mode: CreateMode = CreateMode.error_if_exists, copy_grants: bool | None = False) → PollingOperation['StreamResource']¶
create_async(stream: Stream, *, mode: CreateMode = CreateMode.error_if_exists, copy_grants: bool | None = False) → PollingOperation['StreamResource']

An asynchronous version of create().

Refer to PollingOperation for more information on asynchronous execution and the return type.

items() → ItemsView[str, T]¶
iter(*, like: str | None = None, starts_with: str | None = None, limit: int | None = None, from_name: str | None = None, show_limit: int | None = None) → Iterator[Stream]¶

List streams.

Parameters:
  • like (str) – Parameter to filter the command output by resource name. Uses case-insensitive pattern matching, with support for SQL wildcard characters.

  • starts_with (str) – Parameter to filter the command output based on the string of characters that appear at the beginning of the object name. Uses case-sensitive pattern matching.

  • limit (int) – Parameter to limit the maximum number of rows returned by a command.

  • from_name (str) – Parameter to enable fetching rows only following the first row whose object name matches the specified string. Case-sensitive and does not have to be the full name.

iter_async(*, like: str | None = None, starts_with: str | None = None, limit: int | None = None, from_name: str | None = None, show_limit: int | None = None) → PollingOperation[Iterator[Stream]]¶

An asynchronous version of iter().

Refer to PollingOperation for more information on asynchronous execution and the return type.

keys() → KeysView[str]¶
update_reference(old_name: str, new_name: str, resource: T) → None¶

Update the collection with a new item.

values() → ValuesView[T]¶